Continuamos con los flujos translíticos, pero no vamos a seguir avanzando por ahora en nuevas funciones, vamos a seguir la función de inserción, ya que después del post anterior y las charlas que ello ha generado. Gracias a Roberto Carrancio y Nelson López (sino los conocéis os recomiendo hacerlo). He decidido profundizar un poco más en las consecuencias que puede tener la inserción de datos “a lo loco” en la base de datos y lo que puede conllevar (sí, he intentado ponerme en la piel de un DBA. Roberto, perdóname 😊 ) 

En el post de la semana pasada, teníamos la siguiente arquitectura montada: 

Diagrama

El contenido generado por IA puede ser incorrecto., Imagen 

 

En primer lugar, nuestro SQL On Premise, el encargado de recoger y agrupar la información de nuestro sistema. Esa información, la rescatábamos con un flujo de datos Gen2 (bloque 2). Y después de hacer nuestras transformaciones, la integramos en una SQL Database en Fabric (paso 3). Y por último en Power BI, mediante las UDF, podemos lanzar consultas a Fabric como por ejemplo la inserción de datos que vimos. Hasta aquí, todo correcto. Pero ¿qué ocurriría en las siguientes casuísticas? 

  1. El usuario X, realiza una inserción repetida. Es decir, introduce dos veces el mismo registro en nuestra tabla de dimensión, por ejemplo. 
  2. ¿Qué pasa si desde nuestra SQL On Premise se introduce un valor que ya existe en nuestra SQL Database creado por el usuario? O qué ocurre con los registros creados, ¿se reemplazan por los que vienen de nuevo de nuestra SQL On-premise? 

Ante estos dos escenarios, he intentado plantear una solución que evite estos conflictos entre nosotros y nuestros DBAs, así como lo más importante, una distorsión en nuestros datos. 

Vamos con la primera casuística, y que mejor manera de saber que es lo que ocurre que haciéndolo nosotros mismos. Para ello, vamos a insertar el mismo registro 4 que ya existe en la base de datos. 

Interfaz de usuario gráfica, Aplicación

El contenido generado por IA puede ser incorrecto., Imagen 

 

Ejecutamos la función de inserción de datos que creamos la semana pasada, y vemos que una vez que termina el proceso nos aparece el siguiente mensaje: 

 

Interfaz de usuario gráfica, Texto, Aplicación, Correo electrónico

El contenido generado por IA puede ser incorrecto., Imagen 

 

Y si nos vamos a la vista de datos, podemos ver que nos ha introducido el mismo registro, independientemente de si este ya existía o no. 

 

Tabla

El contenido generado por IA puede ser incorrecto., Imagen 

 

Esto a ya me genera un problema, y no es otro que nuestros datos ya no tienen la calidad que deben tener y es porque hemos “abierto la puerta” a nuestro modelo. ¿Está bien esta opción? Aquí como todo, en mi humilde opinión, depende. Tiene mucho potencial, pero debemos de controlarlo muy bien. ¿Pero cómo puede ser que SQL Database de Fabric nos deje introducir un valor duplicado? Lo correcto sería decir “duplicado”. ¿Y por qué entre comillas? Por que realmente no es un valor duplicado. Si vamos a la SQL Database: 

 

Imagen 

 

Y si previsualizamos los datos que tenemos almacenados en la tabla DimChannel: 

 

Imagen 

 

Vemos una columna al final que se llama MSSQL_System_Uniquifier_1941581955 (nombre fácil, sencillo y para toda la familia) que es un número entero el cual utiliza de forma de identificador único, por tanto, nunca se va a repetir un registro que introduzcamos. Pero podemos “forzar” un chequeo de los campos de nuestra tabla. ¿Cómo? Chequeando que el registro que se está intentando introducir no se encuentra en nuestra BDD. Para ello, vamos a modificar un poco el código de la función: 

 

# Importamos la librería 'fabric.functions', que contiene funciones necesarias para trabajar con Fabric y SQL 

import fabric.functions as fn 

 

# Importamos el módulo de logging para registrar información en el log 

import logging 

 

# Creamos una instancia de funciones personalizadas para usar UDFs 

udf = fn.UserDataFunctions() 

 

# Decorador que define la conexión a la base de datos con el alias "Flujos" 

@udf.connection(argName="sqlDB", alias="Flujos") 

 

# Decorador que declara esta función como una función personalizada (UDF) en Fabric 

@udf.function() 

def AddAnnotation(sqlDB: fn.FabricSqlConnection, ChannelKey: str, ChannelLabel: str, ChannelName: str, ChannelDescription: str) -> str: 

     

    logging.info('Python UDF trigger function processed a request.' 

 

    data = (ChannelKey, ChannelLabel, ChannelName, ChannelDescription) 

 

    connection = sqlDB.connect() 

    cursor = connection.cursor()     

 

    logging.info("Validando si el registro ya existe en DimChannel ...") 

     

    # Comprobamos si ya existe un registro con ese ChannelKey 

    validation_query = "SELECT 1 FROM [dbo].[DimChannel] WHERE ChannelKey = ?" 

    cursor.execute(validation_query, (ChannelKey,)) 

    exists = cursor.fetchone() 

 

    if exists: 

        logging.warning(f"El registro con ChannelKey '{ChannelKey}' ya existe. No se insertará duplicado.") 

        cursor.close() 

        connection.close() 

        return f" El registro con ChannelKey '{ChannelKey}' ya existe y no se ha insertado." 

     

    # Si no existe, insertamos los datos 

    insert_query = "INSERT INTO [dbo].[DimChannel] ([ChannelKey], [ChannelLabel], [ChannelName], [ChannelDescription]) VALUES (?, ?, ?, ?);" 

    cursor.execute(insert_query, data) 

     

    logging.info("Los datos se han añadido correctamente") 

 

    connection.commit() 

    cursor.close() 

    connection.close()                

     

    return "ChannelKey, ChannelLabel, ChannelName y ChannelDescription se han registrado correctamente." 

 

¿Qué hemos hecho en el código de la función? Hemos introducido una comprobación de los datos, que mira en este caso la columna ChannelKey y en caso de que exista, no hace la inserción, en caso contrario, realiza la inserción de los datos. 

 

Interfaz de usuario gráfica, Texto, Aplicación

El contenido generado por IA puede ser incorrecto., Imagen 

 

Con esta modificación, ya tenemos controlado ese aspecto, por lo que vamos a comprobarlo. Para ello, vamos a intentar insertar el registro 3. Para ello: 

 

Interfaz de usuario gráfica

El contenido generado por IA puede ser incorrecto., Imagen 

Ejecutamos la llamada a la UDF, y esperamos a que se ejecute. Tras unos segundos, nos aparece la siguiente ventana emergente: 

 

Interfaz de usuario gráfica, Texto, Aplicación, Correo electrónico

El contenido generado por IA puede ser incorrecto., Imagen 

 

En la que nos indica que no se ha insertado ya que existe el registro. Si vamos a la vista de datos para comprobarlo, vemos que no se ha insertado: 

 

Interfaz de usuario gráfica, Tabla

El contenido generado por IA puede ser incorrecto., Imagen 

 

De esta manera, controlamos nosotros mismos la inserción de posibles valores duplicados en nuestra base de datos y así podemos mantener la integridad de nuestros datos que se insertan desde Power BI. En este ejemplo hemos controlado una única columna, pero podríamos hacer la comprobación de las columnas que deseemos. 

Ya nos hemos ganado el café de hoy :) ¿Seguro?  

Vamos a darle una vuelta más de tuerca, y vamos a ponernos en el caso de que desde Power BI se inserta un dato que nos falta, por ejemplo, el registro 5 y sus datos asociados, pero como es un proceso manual, el usuario se equivoca e inserta el número 6. 

 

Interfaz de usuario gráfica, Aplicación

El contenido generado por IA puede ser incorrecto., Imagen 

 

Esto ya hace que perdamos la secuencia incremental de la ChannelKey y tengamos un salto en ella. ¿Cómo podemos evitar esto? Bien, realizando una pequeña modificación en nuestra UDF. Para ello escribimos: 

 

# -------------------------- IMPORTS Y CONFIG -------------------------- 

import fabric.functions as fn          # Funciones UDF de Microsoft Fabric 

import logging                         # Registro a nivel de aplicación 

 

udf = fn.UserDataFunctions()           # Factoría de decoradores UDF 

 

# Conexión: usa el alias “Flujos” que tienes definido en Fabric 

@udf.connection(argName="sqlDB", alias="Flujos") 

# Declaramos la función como UDF 

@udf.function() 

def AddAnnotation( 

        sqlDB: fn.FabricSqlConnection, 

        ChannelLabel: str, 

        ChannelName: str, 

        ChannelDescription: str 

    ) -> str: 

    """ 

    Inserta un registro en DimChannel asegurando que: 

      • ChannelKey = MAX(ChannelKey) + 1 

      • No se duplica ningún ChannelKey (prevención de colisión concurrente) 

    """ 

 

    logging.info("UDF 'AddAnnotation' iniciada") 

 

    # --------------------------- CONEXIÓN SQL --------------------------- 

    connection = sqlDB.connect() 

    cursor = connection.cursor() 

 

    # 1️ Calcular el siguiente ChannelKey 

    cursor.execute( 

        "SELECT ISNULL(MAX(ChannelKey), 0) + 1 FROM [dbo].[DimChannel];" 

    ) 

    next_channel_key = cursor.fetchone()[0] 

    logging.info(f"Siguiente ChannelKey calculado: {next_channel_key}") 

 

    # 2️ Validar que ese ChannelKey no se haya insertado ya (colisión concurrente) 

    cursor.execute( 

        "SELECT 1 FROM [dbo].[DimChannel] WHERE ChannelKey = ?;", 

        (next_channel_key,) 

    ) 

    if cursor.fetchone(): 

        connection.rollback() 

        cursor.close() 

        connection.close() 

        logging.warning( 

            f"Colisión concurrente: ChannelKey {next_channel_key} ya existe." 

        ) 

        return ( 

            f"El ChannelKey '{next_channel_key}' ya existe " 

            "por inserción concurrente; reintenta la operación." 

        ) 

 

    # 3️ Insertamos el nuevo registro 

    insert_query = """ 

        INSERT INTO [dbo].[DimChannel] 

            ([ChannelKey], [ChannelLabel], [ChannelName], [ChannelDescription]) 

        VALUES (?, ?, ?, ?); 

    """ 

    cursor.execute( 

        insert_query, 

        (next_channel_key, ChannelLabel, ChannelName, ChannelDescription) 

    ) 

    logging.info("Registro insertado en DimChannel") 

 

    # --------------------------- FINALIZAR ------------------------------ 

    connection.commit() 

    cursor.close() 

    connection.close() 

 

    return ( 

        f"Registro creado con ChannelKey {next_channel_key}: " 

        "ChannelLabel, ChannelName y ChannelDescription guardados." 

    ) 

 

 

Donde obtenemos el código de ChannelKey mayor de nuestra BDD y le incrementamos uno con el objetivo de impedir que haya un salto y evitando así que el usuario inserte un registro equivocado: 

 

Texto

El contenido generado por IA puede ser incorrecto., Imagen 

Ahora, vamos a nuestro Power BI y vamos a probar la solución si es viable. Para ello, hemos restaurado los valores en nuestra tabla con los siguientes datos: 

 

Interfaz de usuario gráfica

El contenido generado por IA puede ser incorrecto., Imagen 

 

Vemos que tenemos los valores del 1 al 5 en la columna ChannelKey, por tanto, el siguiente debería ser el 6. Como podéis ver, he eliminado la segmentación de texto correspondiente a ChannelKey, ya que, si todo va bien, esta debería rellenarse sola con el siguiente valor libre. Rellenamos los campos y ejecutamos la función: 

Interfaz de usuario gráfica, Aplicación

El contenido generado por IA puede ser incorrecto., Imagen 

Y nos devuelve la siguiente pantalla en la que podemos ver que nos ha añadido la columna ChannelKey con el número 6: 

 

Interfaz de usuario gráfica, Aplicación

El contenido generado por IA puede ser incorrecto., Imagen 

 

Por tanto, ya hemos controlado otro posible error en la inserción de los datos desde Power BI. En este caso, insisto, sólo miramos una columna de nuestra base de datos, pero podríamos mirar más columnas con el objetivo de realizar una combinación de ellas para evitar datos “inconsistentes”. 

 

¿Y por qué no nos ganamos también el croissant para ese cafecito? Para ello, vamos a seguir con la casuística dos que hemos comentado al inicio de este post. ¿Cómo podemos controlar la inserción del mismo registro desde Power BI y que no se borre o se duplique si viene desde nuestra SQL On-premise?  

 

Esto lo dejo para el siguiente post... :) 

¡Nos vemos en los datos!