Traditional vs modern analytics data processing (part 2)

In my previous article, I wrote about what errors on read, errors on write or silent errors are and how data validation is one of the steps in which we observe the most differences in between the traditional and the modern analytics data processing.

We see that the tools in the modern data stack come with schema auto-discovery, but this can generate errors on read or even silent errors. An easy solution is to implement schema management and to take advantage of the underlying storage system (a database or a data catalogue). Schema is infrastructure and in another article I show-case Alembic for schema management (among other options).

But there will still be some cases in which data validation steps are required after each processing step, such that the data is not corrupted along the way:

From RAW to Integration Layer

In the previous article we saw how the data process from landing zone to raw is affected by not having in place schema management. In this article we will go through the load from raw to the integration layer and experiment further with Mage. A reminder on the data model we will be working with:

Schema and load pattern

For this demo we would follow a daily batch load approach, which means that the source system is exporting the data once a day at a certain moment: this is called cut-off and it is, usually, part of the data contract. That means that a record delivered in a certain file is related with the version that record had in that system at the cut-off time. Therefore, the cut-off is an essential field to be stored in the RAW layer alongside the data.

Depending on the storage and processing layer, one might be able to enable primary, foreign and unique keys. Having those in place at any layer, assures that both the delivery and the data process did not affect the integrity of the data.

But, it is worth mentioning that having such keys in place might slow down the data load and might be expensive. Depending on the use-case, the cost of having such keys might be reasonable. In the traditional way we would have the keys defined at schema level and some of them would be disabled and re-enabled at run time. This decision would have been made based on the performance metrics and, usually, the small dimension table would keep their keys.

In the modern data processing, one might not even define the keys, but define checks to be executed after the load was completed. At the same time, modern storage and processing solutions rely on distributive systems and, thus, the data is not in the same place at once in order to be verified before the load (extreme simplification of distributive systems).

In the below diagram the keys are defined for both the dimension and fact table and for demonstration sake I will keep them enabled:

Before we implement the above flow in Mage, let’s create 2 flows which generate the fake data and write it to a file (simulating the batch delivery from the source system):

The above flows will generate 2 files, which will follow the naming convention as stated in the data contract:

  • customers-{cut-off-date}.csv
  • orders-{cut-off-date}.csv

In this article we will implement the load up until dim_customer in Mage, by following both the traditional and modern way. We will be running the flows with both correct and corrupted data and check out the differences.

Data loaders

A data loader in Mage is the step in which data is retrieved from a source and loaded into a dataframe, which will be available as input for the next step. We will be using a file loader, which will read from landing zone and return the data in a data frame.

@data_loader
def load_data_from_file(*args, **kwargs):
    """
    Template for loading data from filesystem.

    Docs: https://github.com/mage-ai/mage-ai/blob/master/docs/blocks/data_loading.md#fileio
    """
    filepath = f'/app/demo_error_read_write/data_setup/landing_zone/customers-{kwargs["cut_off_date"]}.csv'
    return FileIO().load(filepath, **{'sep': '\t'})

Do you observe the cutoff date? It is configured as a variable at pipeline level so it can be used to retrieve the corresponding data in a certain run! In production setup, though, the cutoff_date would be defaulted to yesterday ( for daily, scheduled runs).

Transformers

Are the steps to transform the data and Mage provides quite a few out of the box (forget about searching again and again aggregation syntax for pandas!). We will be using both Python and SQL transformers. An example of adding the cutoff_date to the dataframe:


@transformer
def transform_df(df: DataFrame, *args, **kwargs) -> DataFrame:
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        df (DataFrame): Data frame from parent block.

    Returns:
        DataFrame: Transformed data frame
    """
    # Specify your transformation logic here
    df["cut_off_date"] = kwargs["cut_off_date"]

    return df

Data exporters

A data exporter is the step in which data is exported (written) to a destination. We will be using it to write to Postgres, but we cannot use the out of the box Mage connector: all the tables contain audit columns which are loaded by the database and not by the data processing step (feature request). Therefore we will be using the pandas.to_sql method:

@data_exporter
def export_data(df, **kwargs):
    """
    Exports data to some source

    Args:
        df (DataFrame): Data frame to export to

    Output (optional):
        Optionally return any object and it'll be logged and
        displayed when inspecting the block run.
    """

    conn = create_engine("postgresql://postgres:postgres@postgres_db:5432/postgres")
    conn = conn.connect()
    
    
    df[['id', 'email_address', 'first_name', 'last_name', 'cut_off_date']].to_sql(
        'raw_customer', schema='postgres',con=conn, if_exists='append', index=False)

Slowly Changing Dimension

Coming from dimensional modeling, the concept of Slowly Changing Dimension (SCD) refers to dimension data which (guess) is changing slowly. In this demo we will be implementing SCD type 2 for the customer data in order to capture the versions of it. Even though on the modern stack we have time traveling, that version is related with the technical timestamp of the record. In this demo we will be constructing validity intervals on the cutoff date, in order to retrieve the version of the record in the source system at a certain moment in time.

Let’s review the dim_customer keys:

  • PK: the primary key and it is unique across the entire table, it is being used to retrieve a single record
  • SK: the surrogate key and it is identifying uniquely a customer in the integration layer
  • NK: the natural key and it is identifying uniquely a customer in the data source layer (hence the SK is the combination of the NK and the source system)

For simplification’s sake, we will map customer_sk and customer_nk to the primary key of the record in the source system.

In order to build SCD, we need to:

  • get all the records from raw for the processing cutoff date
  • compare the records with dim and mark them for insert or for update
  • if the record exists already in dim, close the record from dim

Let’s implement it with SQL:

    select
        rc.id as customer_sk,
        rc.id as customer_nk,
        'ONE' as source_system,
        rc.email_address,
        rc.first_name,
        rc.last_name,
        rc.cut_off_date as valid_from,
        '9999-12-31'::date as valid_to,
         null as changed_fields
    from raw_customer rc
    left join dim_customer dc on (
        rc.id = dc.customer_nk and dc.valid_to='9999-12-31')
    where rc.cut_off_date=???
    and dc.customer_pk is null

We first do an anti join between raw and dim. By doing so we mark for insert the records for customers we never loaded before. Then we construct the new version of existing customers and mark them for insert, by making sure we load new versions only if they are different then the current version:

        select
        rc.id as customer_sk,
        rc.id as customer_nk,
        'ONE' as source_system,
        rc.email_address,
        rc.first_name,
        rc.last_name,
        rc.cut_off_date as valid_from,
        '9999-12-31'::date as valid_to,
        array[
           case when rc.email_address <> dc.email_address then 'email_address' end, 
           case when rc.first_name <> dc.first_name then 'first_name' end,
           case when rc.last_name <> dc.last_name then 'last_name' end
        ] as changed_fields
    from raw_customer rc
    inner join dim_customer dc on (
        rc.id = dc.customer_nk)
    where rc.cut_off_date=???
    and dc.valid_to='9999-12-31'
    and array[rc.email_address, rc.first_name, rc.last_name] <> array[dc.email_address, dc.first_name, dc.last_name]

And finally, with Postgres 15 we can make use of the MERGE INTO operation with which we can insert the above records and close the existing ones:

MERGE into postgres.dim_customer tgt
using (
    select
        rc.id as customer_sk,
        rc.id as customer_nk,
        'ONE' as source_system,
        rc.email_address,
        rc.first_name,
        rc.last_name,
        rc.cut_off_date as valid_from,
        '9999-12-31'::date as valid_to,
        null as changed_fields
    from raw_customer rc
    left join dim_customer dc on (
        rc.id = dc.customer_nk and dc.valid_to='9999-12-31')
    where rc.cut_off_date=???
    and dc.customer_pk is null
    union all
        select
        rc.id as customer_sk,
        rc.id as customer_nk,
        'ONE' as source_system,
        rc.email_address,
        rc.first_name,
        rc.last_name,
        rc.cut_off_date as valid_from,
        '9999-12-31'::date as valid_to,
        array[
           case when rc.email_address <> dc.email_address then 'email_address' end, 
           case when rc.first_name <> dc.first_name then 'first_name' end,
           case when rc.last_name <> dc.last_name then 'last_name' end
        ] as changed_fields
    from raw_customer rc
    inner join dim_customer dc on (
        rc.id = dc.customer_nk)
    where rc.cut_off_date=???
    and dc.valid_to='9999-12-31'
    and array[rc.email_address, rc.first_name, rc.last_name] <> array[dc.email_address, dc.first_name, dc.last_name]
) src
on (tgt.customer_sk = src.customer_sk and tgt.valid_to='9999-12-31')
when matched then update set valid_to = ???-1day, updated_datetime= current_timestamp
when not matched then
insert (customer_sk, customer_nk, source_system, email_address, first_name, last_name, valid_from, valid_to)
Values (src.customer_sk, src.customer_nk, src.source_system, src.email_address, src.first_name, src.last_name, src.valid_from, src.valid_to);

To show-case the modern way I have decided to use dbt. Mage offers support for dbt models, but because dbt snapshot is still a feature request I have decided to run dbt snapshot with subprocess (do not do this at home!).

{% snapshot dim_customer_dbt %}

    {{
        config(
          target_schema='public',
          strategy='check',
          unique_key='customer_nk',
          updated_at='cut_off_date',
          check_cols=['email_address', 'first_name', 'last_name'],
        )
    }}

    select id as customer_nk,
           'ONE' as source_system,
           id as customer_sk,
           email_address,
           first_name,
           last_name,
           cut_off_date
    from raw_customers_pandas

{% endsnapshot %}

The above is the implementation of SCD, checking based on the cut_off_date the fields configured in check_cols for the records with the unique_key.

Putting the pieces together we have the below two flows:

Executing with OK data

For this demo I have generated OK data for 2 dates:

  • 2022-11-16
  • 2022-12-13

By setting the cut-off date at each pipeline, the execution will pick-up the file corresponding to the cut-off date and process it. The final data looks similar in both modern and traditional way:

postgres=# select count(*) from dim_customer;
 count 
-------
    18
(1 row)

postgres=# select * from dim_customer where customer_nk=1;
 customer_pk | customer_sk | customer_nk | source_system |        email_address        | first_name | last_name |            changed_fields            |     valid_from      |      valid_to       |     inserted_datetime      |      updated_datetime      
-------------+-------------+-------------+---------------+-----------------------------+------------+-----------+--------------------------------------+---------------------+---------------------+----------------------------+----------------------------
          19 |           1 |           1 | ONE           | corey86@jordan-gonzales.com | Jose       | Simon     | {email_address,first_name,last_name} | 2022-11-16 00:00:00 | 2022-12-12 00:00:00 | 2022-12-13 21:21:25.573795 | 2022-12-13 21:21:44.394117
          28 |           1 |           1 | ONE           | smithpeter@gmail.com        | Cynthia    | Chapman   |                                      | 2022-12-13 00:00:00 | 9999-12-31 00:00:00 | 2022-12-13 21:22:00.963905 | 2022-12-13 21:22:00.963905
(2 rows)
postgres=# select count(*) from dim_customer_dbt;
 count 
-------
    18
(1 row)
postgres=# select * from dim_customer_dbt where customer_nk=1;
 customer_nk | source_system | customer_sk |        email_address        | first_name | last_name | cut_off_date |            dbt_scd_id            | dbt_updated_at | dbt_valid_from| dbt_valid_to 
-------------+---------------+-------------+-----------------------------+------------+-----------+--------------+----------------------------------+----------------+----------------+--------------
           1 | ONE           |           1 | corey86@jordan-gonzales.com | Jose       | Simon     | 2022-11-16   | 98f453404558f384b1b54911bad40993 | 2022-11-16     | 2022-11-16    | 2022-12-13
           1 | ONE           |           1 | smithpeter@gmail.com        | Cynthia    | Chapman   | 2022-12-13   | 66e094744fa8f4a4a9d0ed6b2439ba52 | 2022-12-13     | 2022-12-13    | 
(2 rows)

So what are the key differences?

  1. In the dbt snapshot we do not have audit information. You can find more information about it in this article.
  2. In dbt snapshot the primary key is the dbt_scd_id column: a combination between the unique_key and the cut_off_date. This could generate issues in case of reprocessing the same date with different data (*not tested)
  3. In dbt snapshot the most recent records have valid_to null. That implies that the users should always apply a coalesce on filtering on the validity interval. Best practice is to have a date far in the future such that the same SQL where clause (date between) can be used irrespective of the version an user would like to retrieve. (Example below)
  4. I consider the field dbt_updated_at misleading as it contains the cut_off_date and not the timestamp of the dbt update (related with auditability)
  5. In the traditional way we can calculate which fields are driving the update (the field changed_fields). In some cases it is important to track changes for each field and if that’s your case I recommend you to check Data Vault and Anchor modelling which solve this need.
postgres=# select count(*) from dim_customer_dbt where current_timestamp between cast(dbt_valid_from as date) and cast(dbt_valid_to as date);
 count 
-------
     0
(1 row)

postgres=# select count(*) from dim_customer where current_timestamp between valid_from and valid_to;
 count 
-------
     9
(1 row)

I also consider the SQL generated by dbt unnecessarily complex (one reason is that MERGE was introduced in Postgres 15):

demo_mage_pg   |          create temporary table "dim_customer_dbt__dbt_tmp215301328009"
demo_mage_pg   |          as (
demo_mage_pg   |            with snapshot_query as (
demo_mage_pg   |            select id as customer_nk,
demo_mage_pg   |                   'ONE' as source_system,
demo_mage_pg   |                   id as customer_sk,
demo_mage_pg   |                   email_address,
demo_mage_pg   |                   first_name,
demo_mage_pg   |                   last_name,
demo_mage_pg   |                   cut_off_date
demo_mage_pg   |            from raw_customers_pandas
demo_mage_pg   |            ),
demo_mage_pg   |        
demo_mage_pg   |            snapshotted_data as (
demo_mage_pg   |        
demo_mage_pg   |                select *,
demo_mage_pg   |                    customer_nk as dbt_unique_key
demo_mage_pg   |        
demo_mage_pg   |                from "postgres"."public"."dim_customer_dbt"
demo_mage_pg   |                where dbt_valid_to is null
demo_mage_pg   |        
demo_mage_pg   |            ),
demo_mage_pg   |        
demo_mage_pg   |            insertions_source_data as (
demo_mage_pg   |        
demo_mage_pg   |                select
demo_mage_pg   |                    *,
demo_mage_pg   |                    customer_nk as dbt_unique_key,
demo_mage_pg   |                    cut_off_date as dbt_updated_at,
demo_mage_pg   |                    cut_off_date as dbt_valid_from,
demo_mage_pg   |                    nullif(cut_off_date, cut_off_date) as dbt_valid_to,
demo_mage_pg   |                    md5(coalesce(cast(customer_nk as varchar ), '')
demo_mage_pg   |                 || '|' || coalesce(cast(cut_off_date as varchar ), '')
demo_mage_pg   |                ) as dbt_scd_id
demo_mage_pg   |        
demo_mage_pg   |                from snapshot_query
demo_mage_pg   |            ),
demo_mage_pg   |        
demo_mage_pg   |            updates_source_data as (
demo_mage_pg   |        
demo_mage_pg   |                select
demo_mage_pg   |                    *,
demo_mage_pg   |                    customer_nk as dbt_unique_key,
demo_mage_pg   |                    cut_off_date as dbt_updated_at,
demo_mage_pg   |                    cut_off_date as dbt_valid_from,
demo_mage_pg   |                    cut_off_date as dbt_valid_to
demo_mage_pg   |        
demo_mage_pg   |                from snapshot_query
demo_mage_pg   |            ),
demo_mage_pg   |        
demo_mage_pg   |            insertions as (
demo_mage_pg   |        
demo_mage_pg   |                select
demo_mage_pg   |                    'insert' as dbt_change_type,
demo_mage_pg   |                    source_data.*
demo_mage_pg   |        
demo_mage_pg   |                from insertions_source_data as source_data
demo_mage_pg   |                left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
demo_mage_pg   |                where snapshotted_data.dbt_unique_key is null
demo_mage_pg   |                   or (
demo_mage_pg   |                        snapshotted_data.dbt_unique_key is not null
demo_mage_pg   |                    and (
demo_mage_pg   |                        (snapshotted_data."email_address" != source_data."email_address"
demo_mage_pg   |                or
demo_mage_pg   |                (
demo_mage_pg   |                    ((snapshotted_data."email_address" is null) and not (source_data."email_address" is null))
demo_mage_pg   |                    or
demo_mage_pg   |                    ((not snapshotted_data."email_address" is null) and (source_data."email_address" is null))
demo_mage_pg   |                ) or snapshotted_data."first_name" != source_data."first_name"
demo_mage_pg   |                or
demo_mage_pg   |                (
demo_mage_pg   |                    ((snapshotted_data."first_name" is null) and not (source_data."first_name" is null))
demo_mage_pg   |                    or
demo_mage_pg   |                    ((not snapshotted_data."first_name" is null) and (source_data."first_name" is null))
demo_mage_pg   |                ) or snapshotted_data."last_name" != source_data."last_name"
demo_mage_pg   |                or
demo_mage_pg   |                (
demo_mage_pg   |                    ((snapshotted_data."last_name" is null) and not (source_data."last_name" is null))
demo_mage_pg   |                    or
demo_mage_pg   |                    ((not snapshotted_data."last_name" is null) and (source_data."last_name" is null))
demo_mage_pg   |                ))
demo_mage_pg   |                    )
demo_mage_pg   |                )
demo_mage_pg   |        
demo_mage_pg   |            ),
demo_mage_pg   |        
demo_mage_pg   |            updates as (
demo_mage_pg   |        
demo_mage_pg   |                select
demo_mage_pg   |                    'update' as dbt_change_type,
demo_mage_pg   |                    source_data.*,
demo_mage_pg   |                    snapshotted_data.dbt_scd_id
demo_mage_pg   |        
demo_mage_pg   |                from updates_source_data as source_data
demo_mage_pg   |                join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
demo_mage_pg   |                where (
demo_mage_pg   |                    (snapshotted_data."email_address" != source_data."email_address"
demo_mage_pg   |                or
demo_mage_pg   |                (
demo_mage_pg   |                    ((snapshotted_data."email_address" is null) and not (source_data."email_address" is null))
demo_mage_pg   |                    or
demo_mage_pg   |                    ((not snapshotted_data."email_address" is null) and (source_data."email_address" is null))
demo_mage_pg   |                ) or snapshotted_data."first_name" != source_data."first_name"
demo_mage_pg   |                or
demo_mage_pg   |                (
demo_mage_pg   |                    ((snapshotted_data."first_name" is null) and not (source_data."first_name" is null))
demo_mage_pg   |                    or
demo_mage_pg   |                    ((not snapshotted_data."first_name" is null) and (source_data."first_name" is null))
demo_mage_pg   |                ) or snapshotted_data."last_name" != source_data."last_name"
demo_mage_pg   |                or
demo_mage_pg   |                (
demo_mage_pg   |                    ((snapshotted_data."last_name" is null) and not (source_data."last_name" is null))
demo_mage_pg   |                    or
demo_mage_pg   |                    ((not snapshotted_data."last_name" is null) and (source_data."last_name" is null))
demo_mage_pg   |                ))
demo_mage_pg   |                )
demo_mage_pg   |            )
demo_mage_pg   |        
demo_mage_pg   |            select * from insertions
demo_mage_pg   |            union all
demo_mage_pg   |            select * from updates
demo_mage_pg   |        
demo_mage_pg   |          );

Executing with corrupted data

For this demo the corrupted file will contain:

  • old records with some updates
  • new records with incorrect id
  • new records with correct id
  • duplicated records
id	email_address	first_name	last_name
8	phicks@dean-williams.com	Amanda	Lewis
8	phicks@dean-williams.com	Duplicated	Lewis
c	oholland@gmail.com	Christine	Williams
10	edoyle@doyle.net	Michael	Wagner
b	zacharyjordan@sutton.net	Lisa	Perez

Traditional way

Due to schema management, the traditional load pipeline fails from the step which loads data to raw. The data in dim_customer is not modified, hence the data is not refreshed.

sqlalchemy.exc.DataError: (psycopg2.errors.InvalidTextRepresentation) invalid input syntax for type integer: "c"
LINE 1: ...-williams.com', 'Amanda', 'Lewis', '2022-12-14'),('c', 'ohol...
                                                             ^
[SQL: INSERT INTO public.raw_customer (id, email_address, first_name, last_name, cut_off_date) VALUES (%(id)s, %(email_address)s, %(first_name)s, %(last_name)s, %(cut_off_date)s)]

Depending on use-case, one might want to load the records which are correct and ignore the ones which are not. Below a few methods to do so:

  1. Proactive: park the wrong records in an error table by checking upfront which records fit the data contract
  2. Reactive: delete the records which are erroneous from the raw file (after a backup) and restart the process

Modern way

By default, the modern pipeline does not fail at the load to raw, because the id was considered text in schema discovery. But the dbt run does fail:

05:11:36  Running with dbt=1.3.1
05:11:36  [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 1 unused configuration paths:
- models.demo.example

05:11:36  Found 1 model, 0 tests, 1 snapshot, 0 analyses, 289 macros, 0 operations, 0 seed files, 4 sources, 0 exposures, 0 metrics
05:11:36  
05:11:36  Concurrency: 1 threads (target='dev')
05:11:36  
05:11:36  1 of 1 START snapshot public.dim_customer_dbt .................................. [RUN]
05:11:36  1 of 1 ERROR snapshotting public.dim_customer_dbt .............................. [ERROR in 0.12s]
05:11:36  
05:11:36  Finished running 1 snapshot in 0 hours 0 minutes and 0.28 seconds (0.28s).
05:11:36  
05:11:36  Completed with 1 error and 0 warnings:
05:11:36  
05:11:36  Database Error in snapshot dim_customer_dbt (snapshots/dim_customer_dbt.sql)
05:11:36    operator does not exist: smallint = text
05:11:36    LINE 71: ...apshotted_data on snapshotted_data.dbt_unique_key = source_d...
05:11:36                                                                  ^
05:11:36    HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.
05:11:36  
05:11:36  Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

When dbt snapshot was executed for the first time, the table was created with create as and Postgres decided to assign the data type smallint to the ID column, which will start breaking at some point due to out of range.

postgres=# update dim_customer_dbt set customer_nk = 32768 where customer_nk=9;
ERROR:  smallint out of range

In order to fix the process we need now to correct the data in the load to raw step, even if the error occurred in the dbt step. We can:

  1. Manage the schema of the raw table as we do in the traditional way
  2. Add validation steps on the data we prepare in the add_cut_off_date before the load step

With the help of the @test decorator we can define validation rules in each step:

@test
def validate_data(df) -> None:
    """
    Validate 
    """
    from pandas.api.types import is_integer_dtype
    assert df is not None, 'The output is undefined'
    assert is_integer_dtype(df.id)

Now the process fails at the validation step in the add_cut_off_date:

with the (not too detailed) error:

[add_cut_off_date] FAIL: validate_ (block: add_cut_off_date)
[add_cut_off_date] --------------------------------------------------------------
[add_cut_off_date] Traceback (most recent call last):
[add_cut_off_date]   File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py", line 1110, in run_tests
[add_cut_off_date]     func(*outputs)
[add_cut_off_date]   File "<string>", line 36, in validate_
[add_cut_off_date] AssertionError

Let’s clean the records with wrong data and give it another try!

Input data:

id	email_address	first_name	last_name
8	phicks@dean-williams.com	Amanda	Lewis
8	phicks@dean-williams.com	Duplicated	Lewis
10	edoyle@doyle.net	Michael	Wagner

The traditional pipeline fails at the raw step with

sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "pk_raw_customer_id"
DETAIL:  Key (id, cut_off_date)=(8, 2022-12-14) already exists.

While the modern pipeline runs successfully. Let’s see how the data looks like in the table:

postgres=# select customer_nk, email_address, first_name, last_name, dbt_valid_from, dbt_valid_to from dim_customer_dbt where customer_nk=8;
 customer_nk |      email_address       | first_name | last_name | dbt_valid_from | dbt_valid_to 
-------------+--------------------------+------------+-----------+----------------+--------------
           8 | qkirby@yahoo.com         | Eric       | Lopez     | 2022-11-16     | 2022-12-13
           8 | mariamartin@hotmail.com  | Thomas     | Gould     | 2022-12-13     | 2022-12-14
           8 | phicks@dean-williams.com | Amanda     | Lewis     | 2022-12-14     | 
           8 | phicks@dean-williams.com | Duplicated | Lewis     | 2022-12-14     | 
(4 rows)

The duplicated data reached the dim table, breaking data integrity. In order to prevent this we need to add a uniqueness check as well:

@test
def validate_(df) -> None:
    """
    Validate 
    """
    from pandas.api.types import is_integer_dtype
    assert df is not None, 'The output is undefined'
    assert is_integer_dtype(df.id)
    assert df.id.is_unique

With this setup the data preparation step will fail with:

[add_cut_off_date] FAIL: validate_ (block: add_cut_off_date)
[add_cut_off_date] --------------------------------------------------------------
[add_cut_off_date] Traceback (most recent call last):
[add_cut_off_date]   File "/usr/local/lib/python3.10/site-packages/mage_ai/data_preparation/models/block/__init__.py", line 1110, in run_tests
[add_cut_off_date]     func(*outputs)
[add_cut_off_date]   File "<string>", line 37, in validate_
[add_cut_off_date] AssertionError

While the error trace does not say much, checking the code we understand which validation step failed:

Or simply add description to the assert step!

    assert is_integer_dtype(df.id), 'The id is not of type integer'
    assert df.id.is_unique, 'The id is not unique'

which will generate extra information in the error:

[add_cut_off_date]   File "<string>", line 37, in validate_

[add_cut_off_date] AssertionError: The id is not unique

[add_cut_off_date] 

Conclusions

In this article we went through setting up 2 data pipelines in Mage: one in the modern way by using schema auto-discovery and dbt and one in the traditional way, by using schema management and MERGE operation. We saw that in the modern way, the errors happen on read or are silenced, unless we configure validation rules in our process.

Being it modern or traditional data processing, errors due to wrong data delivered (or retrieved) requires contact and solving of issues on the source system. Data processing platforms cannot replace the collaboration between teams, between data producers and data consumers, and even if the modern way made data processing flexible, agile and in line with software development practices, the collaboration is still an area which needs to be tackled in the data domain.

The next article in this series would be about how to process the order data and how data validation (or the lack thereof) might impact it. Until then, feel free to try it out yourself, the code is here.

Documentation

  1. Slowly Changing Dimension
  2. Postgres 15 Merge
  3. DBT Snapshot

Direct message