When adding new records, data warehouses often set a field that denotes when data was ingested. This helps with tracking changes to data over time, and removing duplicates. In this tutorial, you’ll first learn how to set a synced_at
column in #BigQuery and beyond with #SQLAlchemy.
Typical Use Case
To better understand a common use case, let’s take a look at the below raw person
table:
id | first_name | last_name | favorite_color | synced_at |
---|---|---|---|---|
1 | Joshua | Trusty | red | 09-14-2023 |
1 | Joshua | Trusty | scarlet | 01-01-2023 |
2 | Misael | Gonzalez | red | 09-14-2023 |
Since the table is raw, it has an obvious duplicate. The person ‘Joshua Trusty’ appears twice in this table. We may consider this a duplicate if the first three columns constitute the table’s primary key. In such a case, the synced_at
field helps us to identify the most recent version. This could also be helpful if we want to understand how a unique person
changes their favorite_color
over time. Here, we see that person ‘Joshua Trusty’ had a favorite color of ‘scarlet’ in the beginning of the year, but later changed it to ‘red’.
Clearly, a synced_at
column has multiple uses for building data pipelines and understanding your data. Fortunately, the process of incorporating it is relatively straightforward. Let’s discuss a quick and easy way to add a synced_at
timestamp column in BigQuery.
Add synced_at
to BigQuery
Via Table Creation
The easiest way to add synced_at
is right when you are designing your table from the start. As an example, you can do this using the code example below:
CREATE OR REPLACE TABLE
`tutorial.person` (
id INT64,
first_name STRING,
last_name STRING,
favorite_color STRING,
synced_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
;
Here’s a brief explanation of the code block:
- The SQL code creates a table named
person
within the tutorial dataset. - Most of the columns (except
synced_at
), such asid
being an INT64 and so on. - The synced_at column is configured with
DEFAULT CURRENT_TIMESTAMP
, ensuring that if its value is not explicitly set by a query, it will be automatically set to the current timestamp.
Now suppose we decided to insert a record into this table like so:
INSERT INTO
`tutorial.person` (
id,
first_name,
last_name,
favorite_color
)
VALUES
(1, "Joshua", "Trusty", "red")
;
Notice that we did not set the synced_at
column in our query. Yet, when we select the only value in the table, we see that it has been populated for us:
id | first_name | last_name | favorite_color | synced_at |
---|---|---|---|---|
1 | Joshua | Trusty | red | 2023-10-14 16:12:44.138938 UTC |
If you don’t already have your table created, this is the cleanest way of doing so within BigQuery.
Via Table Alteration
Table alteration shouldn’t be your first choice, but often times we work with pre-existing infrastructure.
First, we’ll recreate the person
table to omit the synced_at
column:
CREATE OR REPLACE TABLE
`tutorial.person` (
id INT64,
first_name STRING,
last_name STRING,
favorite_color STRING,
)
;
Now, we can alter the table and add the synced_at
column:
ALTER TABLE `tutorial.person`
ADD COLUMN synced_at TIMESTAMP;
ALTER TABLE `tutorial.person` ALTER COLUMN synced_at SET DEFAULT CURRENT_TIMESTAMP;
UPDATE `tutorial.person` SET synced_at = CURRENT_TIMESTAMP WHERE TRUE;
Here’s a brief explanation of the code block:
- The first statement adds a new column,
synced_at
, of type TIMESTAMP to the existingtutorial.person
table. - The second statement modifies the default value of the
synced_at
column to be the current timestamp. - The third statement updates all rows in the
tutorial.person
table, setting thesynced_at
column to the current timestamp. TheWHERE TRUE
condition ensures that all rows are updated.
At time of writing, the approach requires three different queries to accomplish. If you try in under three, you’ll get an error message from BQ saying:
Add field with default value to an existing table schema is not supported
For this reason, having the default value set at table creation makes the most sense. Next, we’ll explore a more flexible option provided by SQLAlchemy.
SQLAlchemy
I’m a huge fan of SQLA because it allows you to setup your tables in a largely cloud-agnostic way.
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
your_project = 'edit_this'
your_dataset = 'edit_this'
credentials = json.loads(
environ.get("GOOGLE_APPLICATION_CREDENTIALS", "{}")
) # Set your credentials in an environment variable
class Person(Base):
__tablename__ = 'person'
id = sa.Column(sa.Integer, primary_key=True)
first_name = sa.Column(sa.String)
last_name = sa.Column(sa.String)
favorite_color = sa.Column(sa.String)
synced_at = sa.Column(
sa.TIMESTAMP,
server_default=sa.func.current_timestamp(),
)
engine = sa.create_engine(f'bigquery://{your_project}/{your_dataset}/',
credentials_info={credentials},
)
Base.metadata.create_all(engine)
Here’s a brief explanation of the code block:
- It defines
Base
as an instance of the declarative base class. ThisBase
will be used as the base class for your SQLAlchemy models. - The code defines a class named
Person
, which represents a table in the database.id
,first_name
,last_name
,favorite_color
, andsynced_at
are class attributes representing the columns in theperson
table.
- An SQLAlchemy database engine is created using
sa.create_engine
. The engine specifies the database type (in this case, BigQuery), your project and dataset, and any credentials required for connecting to the database. - Finally,
Base.metadata.create_all(engine)
creates the table in the database based on thePerson
class. This statement generates the SQL required to create the table structure based on the class definition and executes it on the database.
If you’d like to get that code operational, you just need to include your specific credentials and other minor details. While it works immediately for BigQuery, you can switch the engine
to any database you’d like. That means the code is flexible enough to adjust to your preferred data warehousing service.
Conclusion
I hope you enjoyed this short tutorial on how to keep track of different iterations of your data. This approach is extraordinarily helpful if you want to deduplicate, or just want to perform data modeling over a time series. If you’re just starting out with building your data pipelines, I recommend delving deeper into SQLAlchemy. The library can help you explore multiple data warehousing services before you commit to the one that best fits your use case. Thanks for reading.