Fact Tables

pygrametl provides multiple classes for representing fact tables. These classes enable facts to be loaded one at a time, as batches stored in memory, or in bulk from a file on disk. Support for loading facts with missing information and then updating them later is also supported. For information about how to load facts in parallel see Parallel. In the following examples, we use PostgreSQL as the RDBMS and psycopg2 as the database driver.

All of the following classes are currently implemented in the pygrametl.tables module.

FactTable

The most basic class for representing a fact table is FactTable. Before creating a FactTable object, an appropriate table must be created in the database, and a PEP 249 connection to the database must be created and wrapped by the class ConnectionWrapper. For more information about how database connections are used in pygrametl see Database. FactTable constructor must be given the table’s name, the attributes used as measures in the fact table, and the attributes referencing dimensions (keyrefs). Be aware that FactTable performs an insert in the database whenever the FactTable.insert() method is called, which can very quickly become a bottleneck.

import psycopg2
import pygrametl
from pygrametl.tables import FactTable

# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
                          password='dwpass'""")

# This ConnectionWrapper will be set as a default and is then implicitly
# used, but it is stored in conn so transactions can be committed and the
# connection closed
conn = pygrametl.ConnectionWrapper(connection=pgconn)

# This instance of FactTable connects to the table facttable in the
# database using the default connection wrapper created above
factTable = FactTable(
    name='facttable',
    measures=['price'],
    keyrefs=['storeid', 'productid', 'dateid'])

The above example shows the three step process needed to connect an instance of FactTable to an existing database table. Firstly, a PEP 249 connection to the database is created. Then an instance of ConnectionWrapper is created to provide a uniform interface to all types of database connections supported by pygrametl. The instance of ConnectionWrapper is also set as the default database connection to use for this ETL flow. Lastly, a FactTable is created as a representation of the actual database table.

Operations on the fact table are done using three methods: FactTable.insert() inserts new facts directly into the fact table when they are passed to the method. FactTable.lookup() returns a fact if the database contains one with the given combination of keys referencing the dimensions. FactTable.ensure() combines FactTable.lookup() and FactTable.insert() by ensuring that a fact does not exist before inserting it. An example of each function and the automatic name mapping can be seen below, where the fact table from the last example is reused.

import psycopg2
import pygrametl
from pygrametl.tables import FactTable

# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
                          password='dwpass'""")

# This ConnectionWrapper will be set as a default and is then implicitly
# used, but it is stored in conn so transactions can be committed and the
# connection closed
conn = pygrametl.ConnectionWrapper(connection=pgconn)

# This instance of FactTable connects to the table facttable in the
# database using the default connection wrapper created above
factTable = FactTable(
    name='facttable',
    measures=['price'],
    keyrefs=['storeid', 'productid', 'dateid'])

# A list of facts ready to inserted into the fact table
facts = [{'storeid': 1, 'productid': 13, 'dateid': 4, 'price': 50},
         {'storeid': 2, 'productid':  7, 'dateid': 4, 'price': 75},
         {'storeid': 1, 'productid':  7, 'dateid': 4, 'price': 50},
         {'storeid': 3, 'productid':  9, 'dateid': 4, 'price': 25}]

# The facts can be inserted using the insert method
for row in facts:
    factTable.insert(row)
conn.commit()

# Lookup returns the keys and measures given only the keys
row = factTable.lookup({'storeid': 1, 'productid': 13, 'dateid': 4})

# Ensure should be used when loading facts that might already be loaded
newFacts = [{'storeid': 2, 'itemid':  7, 'dateid': 4, 'price': 75},
            {'storeid': 1, 'itemid':  7, 'dateid': 4, 'price': 50},
            {'storeid': 1, 'itemid':  2, 'dateid': 7, 'price': 150},
            {'storeid': 3, 'itemid':  3, 'dateid': 6, 'price': 100}]

for row in newFacts:
    # The second argument forces ensure to not only match the keys for facts
    # to be considered equal, but also checks if the measures are the same
    # for facts with the same key, and if not raises a ValueError. The third
    # argument renames itemid to productid using a name mapping
    factTable.ensure(row, True, {'productid': 'itemid'})
conn.commit()
conn.close()

BatchFactTable

BatchFactTable loads facts into the fact table in batches instead of one at a time like FactTable. Thus reducing the number of round trips to the database which improves the performance of the ETL flow. The size of each batch is determined by the batchsize parameter added to the class’s constructor. BatchFactTable loads each batch using either the executemany() method specified in PEP 249 or a single SQL INSERT INTO facttable VALUES(...) statement depending on the value passed to usemultirow in the classes constructor. The ConnectionWrapper.commit() method must be called after all facts have been inserted into the fact table to both ensure that the last batch is loaded into the database from memory and that the transaction is committed.

Note

Both BatchFactTable.lookup() and BatchFactTable.ensure() force the current batch of facts to be an inserted. This is to keep them consistent with all of facts inserted into the fact table. Thus using these methods can reduce the benefit of batching insertions.

BulkFactTable

BulkFactTable also inserts facts in batches but writes the facts to a temporary file instead of keeping them in memory. Thus the size of a batch is limited by the size of the disk instead of the amount of memory available. However, this prevents BulkFactTable.lookup() and BulkFactTable.ensure() from being implemented efficiently, so these methods are not available. Like for BatchFactTable, the method ConnectionWrapper.commit() must be called to ensure that the last batch of facts is loaded into the database. Multiple additional parameters have been added to the class’s constructor to provide control over the temporary file used to store facts, such as what delimiters to use and the number of facts to be bulk loaded in each batch. All of these parameters have a default value except for bulkloader. This parameter must be passed a function that will be called for each batch of facts to be loaded. This is necessary as the exact way to perform bulk loading differs from RDBMS to RDBMS.

func(name, attributes, fieldsep, rowsep, nullval, filehandle):

Required signature of a function bulk loading data from a file into an RDBMS in pygrametl. For more information about bulk loading see Bulk Loading.

Arguments:

  • name: the name of the fact table in the data warehouse.

  • attributes: a list containing both the sequence of attributes constituting the primary key of the fact table, as well as the measures.

  • fieldsep: the string used to separate fields in the temporary file.

  • rowsep: the string used to separate rows in the temporary file.

  • nullval: if the BulkFactTable was passed a string to substitute None values with, then it will be passed, if not then None is passed.

  • filehandle: either the name of the file or the file object itself, depending upon the value of BulkFactTable.usefilename. Using the filename is necessary if the bulk loading is invoked through SQL (instead of directly via a method on the PEP249 driver). It is also necessary if the bulkloader runs in another process.

In the following example, a BulkFactTable is used to bulk load facts into a data warehouse using function pgbulkloader(). For information about how to bulk loading data into other RDBMSs see Bulk Loading.

import psycopg2
import pygrametl
from pygrametl.tables import BulkFactTable

pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
                          password='dwpass'""")

conn = pygrametl.ConnectionWrapper(connection=pgconn)

facts = [{'storeid': 1, 'productid': 13, 'dateid': 4, 'price': 50},
         {'storeid': 2, 'productid':  7, 'dateid': 4, 'price': 75},
         {'storeid': 1, 'productid':  7, 'dateid': 4, 'price': 50},
         {'storeid': 3, 'productid':  9, 'dateid': 4, 'price': 25}]


# This function bulk loads a file into PostgreSQL using psycopg2
def pgbulkloader(name, attributes, fieldsep, rowsep, nullval, filehandle):
    cursor = conn.cursor()
    # psycopg2 does not accept the default value used to represent NULL
    # by BulkDimension, which is None. Here this is ignored as we have no
    # NULL values that we wish to substitute for a more descriptive value
    cursor.copy_from(file=filehandle, table=name, sep=fieldsep,
                     columns=attributes)


# The bulk loading function must be passed to BulkFactTable's constructor
factTable = BulkFactTable(
    name='facttable',
    measures=['price'],
    keyrefs=['storeid', 'productid', 'dateid'],
    bulkloader=pgbulkloader)

# commit() and close() must be called to ensure that all facts have been
# inserted into the database and that the connection is closed correctly
#  afterward
for row in facts:
    factTable.insert(row)
conn.commit()
conn.close()

AccumulatingSnapshotFactTable

AccumulatingSnapshotFactTable represents a fact table where facts are updated as a process evolves. Typically different date references (OrderDate, PaymentDate, ShipDate, DeliveryDate, etc.) are set when they become known. Measures (e.g., measuring the lag between the different dates) are also often set as they become available. Like for FactTable, the class AccumulatingSnapshotFactTable performs an insert in the database whenever the AccumulatingSnapshotFactTable.insert() method is called. The following example illustrates how to create the class:

import psycopg2
import pygrametl
from pygrametl.tables import AccumulatingSnapshotFactTable

# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
                          password='dwpass'""")

# This ConnectionWrapper will be set as a default and is then implicitly
# used, but it is stored in conn so transactions can be committed and the
# connection closed
conn = pygrametl.ConnectionWrapper(connection=pgconn)


# A factexpander can be used to modify a row only if it has been updated, note
# that we only ignore namemapping for brevity, production code should use it
def computelag(row, namemapping, updated):
    if 'shipmentdateid' in updated:
        row['shipmentlag'] = row['shipmentdateid'] - row['paymentdateid']
    if 'deliverydateid' in updated:
        row['deliverylag'] = row['deliverydate'] - row['shipmentdateid']


# This instance of AccumulatingSnapshotFactTable connects to the table
# orderprocessing in the database using the connection created above
asft = AccumulatingSnapshotFactTable(
    name='orderprocessing',
    keyrefs=['orderid', 'customerid', 'productid'],
    otherrefs=['paymentdateid', 'shipmentdateid', 'deliverydateid'],
    measures=['price', 'shipmentlag', 'deliverylag'],
    factexpander=computelag)

Firstly a PEP 249 connection is created to perform the actual database operations, then an instance of the ConnectionWrapper is created as a uniform wrapper around the PEP 249 connection which is set as the default database connection for this ETL flow. Then a user-defined function to compute lag measures is defined. Lastly, an AccumulatingSnapshotFactTable is created.

As stated AccumulatingSnapshotFactTable.insert() inserts new facts directly into the fact table when they are passed to the method. AccumulatingSnapshotFactTable.lookup() checks if the database contains a fact with the given combination of keys referencing the dimensions. These methods behave in the same way as in FactTable. The method AccumulatingSnapshotFactTable.update(), will based on the keyrefs, find the fact and update it if there are any differences in otherrefs and measures. The method AccumulatingSnapshotFactTable.ensure() checks if the row it is given, already exists in the database table. If it does not exist, it is immediately inserted. If it exists, the method will see if some of the values for otherrefs or measures have been updated in the passed row. If so, it will update the row in the database. Before that, it will, however, run the factexpander() if one was given to AccumulatingSnapshotFactTable.__init__() when the object was created. Note that the generated SQL for lookups and updates will use the keyrefs in the WHERE clause and an index on them should be considered. An example of how to use the class can be seen below:

import psycopg2
import pygrametl
from pygrametl.tables import AccumulatingSnapshotFactTable

# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
                          password='dwpass'""")

# A factexpander can be used to modify a row only if it has been updated, note
# that we only ignore namemapping for brevity, production code should use it
conn = pygrametl.ConnectionWrapper(connection=pgconn)


# A factexpander can be used to modify a row only if it has been updated, note
# that we only ignore namemapping for brevity, production code should use it
def computelag(row, namemapping, updated):
    if 'shipmentdateid' in updated:
        row['shipmentlag'] = row['shipmentdateid'] - row['paymentdateid']
    if 'deliverydateid' in updated:
        row['deliverylag'] = row['deliverydate'] - row['shipmentdateid']


# This instance of AccumulatingSnapshotFactTable connects to the table
# orderprocessing in the database using the connection created above
asft = AccumulatingSnapshotFactTable(
    name='orderprocessing',
    keyrefs=['orderid', 'customerid', 'productid'],
    otherrefs=['paymentdateid', 'shipmentdateid', 'deliverydateid'],
    measures=['price', 'shipmentlag', 'deliverylag'],
    factexpander=computelag)

# A list of facts that are ready to inserted into the fact table
facts = [{'orderid': 1, 'customerid': 1, 'productid': 1, 'price': 10},
         {'orderid': 2, 'customerid': 2, 'productid': 2, 'price': 20},
         {'orderid': 3, 'customerid': 3, 'productid': 3, 'price': 30}]

# The facts can be inserted using the ensure method. (If we had used the
# insert method instead, we should have made sure the facts above had a
# value for each attribute in the fact table. When using ensure, missing
# attributes will be set to None before an insertion.)
for row in facts:
    asft.ensure(row)

# Now assume that the the orders get paid and shipped
facts[0]['paymentdateid'] = 12
facts[0]['shipmentdateid'] = 14
facts[2]['paymentdateid'] = 11

# Update the accumulating fact table in the DW
for row in facts:
    asft.ensure(row)  # will call computelag and do the needed updates

conn.commit()
conn.close()