Fact Tables¶
pygrametl provides multiple classes for representing fact tables. These classes enable facts to be loaded one at a time, as batches stored in memory, or in bulk from a file on disk. Support for loading facts with missing information and then updating them later is also supported. For information about how to load facts in parallel see Parallel. In the following examples, we use PostgreSQL as the RDBMS and psycopg2 as the database driver.
All of the following classes are currently implemented in the
pygrametl.tables
module.
FactTable¶
The most basic class for representing a fact table is FactTable
.
Before creating a FactTable
object, an appropriate table must be
created in the database, and a PEP 249 connection to the database must be
created and wrapped by the class ConnectionWrapper
. For more
information about how database connections are used in pygrametl see
Database. FactTable
constructor must be given the table’s
name
, the attributes used as measures
in the fact table, and the
attributes referencing dimensions (keyrefs
). Be aware that
FactTable
performs an insert in the database whenever the
FactTable.insert()
method is called, which can very quickly become a
bottleneck.
import psycopg2
import pygrametl
from pygrametl.tables import FactTable
# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
password='dwpass'""")
# This ConnectionWrapper will be set as a default and is then implicitly
# used, but it is stored in conn so transactions can be committed and the
# connection closed
conn = pygrametl.ConnectionWrapper(connection=pgconn)
# This instance of FactTable connects to the table facttable in the
# database using the default connection wrapper created above
factTable = FactTable(
name='facttable',
measures=['price'],
keyrefs=['storeid', 'productid', 'dateid'])
The above example shows the three step process needed to connect an instance of
FactTable
to an existing database table. Firstly, a PEP 249
connection to the database is created. Then an instance of
ConnectionWrapper
is created to provide a uniform interface to all
types of database connections supported by pygrametl. The instance of
ConnectionWrapper
is also set as the default database connection to
use for this ETL flow. Lastly, a FactTable
is created as a
representation of the actual database table.
Operations on the fact table are done using three methods:
FactTable.insert()
inserts new facts directly into the fact table when
they are passed to the method. FactTable.lookup()
returns a fact if the
database contains one with the given combination of keys referencing the
dimensions. FactTable.ensure()
combines FactTable.lookup()
and
FactTable.insert()
by ensuring that a fact does not exist before
inserting it. An example of each function and the automatic name mapping can be
seen below, where the fact table from the last example is reused.
import psycopg2
import pygrametl
from pygrametl.tables import FactTable
# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
password='dwpass'""")
# This ConnectionWrapper will be set as a default and is then implicitly
# used, but it is stored in conn so transactions can be committed and the
# connection closed
conn = pygrametl.ConnectionWrapper(connection=pgconn)
# This instance of FactTable connects to the table facttable in the
# database using the default connection wrapper created above
factTable = FactTable(
name='facttable',
measures=['price'],
keyrefs=['storeid', 'productid', 'dateid'])
# A list of facts ready to inserted into the fact table
facts = [{'storeid': 1, 'productid': 13, 'dateid': 4, 'price': 50},
{'storeid': 2, 'productid': 7, 'dateid': 4, 'price': 75},
{'storeid': 1, 'productid': 7, 'dateid': 4, 'price': 50},
{'storeid': 3, 'productid': 9, 'dateid': 4, 'price': 25}]
# The facts can be inserted using the insert method
for row in facts:
factTable.insert(row)
conn.commit()
# Lookup returns the keys and measures given only the keys
row = factTable.lookup({'storeid': 1, 'productid': 13, 'dateid': 4})
# Ensure should be used when loading facts that might already be loaded
newFacts = [{'storeid': 2, 'itemid': 7, 'dateid': 4, 'price': 75},
{'storeid': 1, 'itemid': 7, 'dateid': 4, 'price': 50},
{'storeid': 1, 'itemid': 2, 'dateid': 7, 'price': 150},
{'storeid': 3, 'itemid': 3, 'dateid': 6, 'price': 100}]
for row in newFacts:
# The second argument forces ensure to not only match the keys for facts
# to be considered equal, but also checks if the measures are the same
# for facts with the same key, and if not raises a ValueError. The third
# argument renames itemid to productid using a name mapping
factTable.ensure(row, True, {'productid': 'itemid'})
conn.commit()
conn.close()
BatchFactTable¶
BatchFactTable
loads facts into the fact table in batches instead of
one at a time like FactTable
. Thus reducing the number of round trips
to the database which improves the performance of the ETL flow. The size of each
batch is determined by the batchsize
parameter added to the class’s
constructor. BatchFactTable
loads each batch using either the
executemany()
method specified in PEP 249 or a single SQL INSERT INTO
facttable VALUES(...)
statement depending on the value passed to
usemultirow
in the classes constructor. The
ConnectionWrapper.commit()
method must be called after all facts have
been inserted into the fact table to both ensure that the last batch is loaded
into the database from memory and that the transaction is committed.
Note
Both BatchFactTable.lookup()
and BatchFactTable.ensure()
force the current batch of facts to be an inserted. This is to keep
them consistent with all of facts inserted into the fact table. Thus
using these methods can reduce the benefit of batching insertions.
BulkFactTable¶
BulkFactTable
also inserts facts in batches but writes the facts to a
temporary file instead of keeping them in memory. Thus the size of a batch is
limited by the size of the disk instead of the amount of memory available.
However, this prevents BulkFactTable.lookup()
and
BulkFactTable.ensure()
from being implemented efficiently, so these
methods are not available. Like for BatchFactTable
, the method
ConnectionWrapper.commit()
must be called to ensure that the last batch
of facts is loaded into the database. Multiple additional parameters have been
added to the class’s constructor to provide control over the temporary file used
to store facts, such as what delimiters to use and the number of facts to be
bulk loaded in each batch. All of these parameters have a default value except
for bulkloader
. This parameter must be passed a function that will be
called for each batch of facts to be loaded. This is necessary as the exact way
to perform bulk loading differs from RDBMS to RDBMS.
- func(name, attributes, fieldsep, rowsep, nullval, filehandle):
Required signature of a function bulk loading data from a file into an RDBMS in pygrametl. For more information about bulk loading see Bulk Loading.
Arguments:
name: the name of the fact table in the data warehouse.
attributes: a list containing both the sequence of attributes constituting the primary key of the fact table, as well as the measures.
fieldsep: the string used to separate fields in the temporary file.
rowsep: the string used to separate rows in the temporary file.
nullval: if the
BulkFactTable
was passed a string to substitute None values with, then it will be passed, if not then None is passed.filehandle: either the name of the file or the file object itself, depending upon the value of
BulkFactTable.usefilename
. Using the filename is necessary if the bulk loading is invoked through SQL (instead of directly via a method on the PEP249 driver). It is also necessary if the bulkloader runs in another process.
In the following example, a BulkFactTable
is used to bulk load facts
into a data warehouse using function pgbulkloader()
. For information about
how to bulk loading data into other RDBMSs see Bulk Loading.
import psycopg2
import pygrametl
from pygrametl.tables import BulkFactTable
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
password='dwpass'""")
conn = pygrametl.ConnectionWrapper(connection=pgconn)
facts = [{'storeid': 1, 'productid': 13, 'dateid': 4, 'price': 50},
{'storeid': 2, 'productid': 7, 'dateid': 4, 'price': 75},
{'storeid': 1, 'productid': 7, 'dateid': 4, 'price': 50},
{'storeid': 3, 'productid': 9, 'dateid': 4, 'price': 25}]
# This function bulk loads a file into PostgreSQL using psycopg2
def pgbulkloader(name, attributes, fieldsep, rowsep, nullval, filehandle):
cursor = conn.cursor()
# psycopg2 does not accept the default value used to represent NULL
# by BulkDimension, which is None. Here this is ignored as we have no
# NULL values that we wish to substitute for a more descriptive value
cursor.copy_from(file=filehandle, table=name, sep=fieldsep,
columns=attributes)
# The bulk loading function must be passed to BulkFactTable's constructor
factTable = BulkFactTable(
name='facttable',
measures=['price'],
keyrefs=['storeid', 'productid', 'dateid'],
bulkloader=pgbulkloader)
# commit() and close() must be called to ensure that all facts have been
# inserted into the database and that the connection is closed correctly
# afterward
for row in facts:
factTable.insert(row)
conn.commit()
conn.close()
AccumulatingSnapshotFactTable¶
AccumulatingSnapshotFactTable
represents a fact table where facts are
updated as a process evolves. Typically different date references (OrderDate,
PaymentDate, ShipDate, DeliveryDate, etc.) are set when they become known.
Measures (e.g., measuring the lag between the different dates) are also often
set as they become available. Like for FactTable
, the class
AccumulatingSnapshotFactTable
performs an insert in the database
whenever the AccumulatingSnapshotFactTable.insert()
method is called. The
following example illustrates how to create the class:
import psycopg2
import pygrametl
from pygrametl.tables import AccumulatingSnapshotFactTable
# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
password='dwpass'""")
# This ConnectionWrapper will be set as a default and is then implicitly
# used, but it is stored in conn so transactions can be committed and the
# connection closed
conn = pygrametl.ConnectionWrapper(connection=pgconn)
# A factexpander can be used to modify a row only if it has been updated, note
# that we only ignore namemapping for brevity, production code should use it
def computelag(row, namemapping, updated):
if 'shipmentdateid' in updated:
row['shipmentlag'] = row['shipmentdateid'] - row['paymentdateid']
if 'deliverydateid' in updated:
row['deliverylag'] = row['deliverydate'] - row['shipmentdateid']
# This instance of AccumulatingSnapshotFactTable connects to the table
# orderprocessing in the database using the connection created above
asft = AccumulatingSnapshotFactTable(
name='orderprocessing',
keyrefs=['orderid', 'customerid', 'productid'],
otherrefs=['paymentdateid', 'shipmentdateid', 'deliverydateid'],
measures=['price', 'shipmentlag', 'deliverylag'],
factexpander=computelag)
Firstly a PEP 249 connection is created to perform the actual database
operations, then an instance of the ConnectionWrapper
is created as a
uniform wrapper around the PEP 249 connection which is set as the default database
connection for this ETL flow. Then a user-defined function to compute lag measures
is defined. Lastly, an AccumulatingSnapshotFactTable
is created.
As stated AccumulatingSnapshotFactTable.insert()
inserts new facts
directly into the fact table when they are passed to the method.
AccumulatingSnapshotFactTable.lookup()
checks if the database contains a
fact with the given combination of keys referencing the dimensions. These
methods behave in the same way as in FactTable
. The method
AccumulatingSnapshotFactTable.update()
, will based on the keyrefs
,
find the fact and update it if there are any differences in otherrefs
and measures
. The method AccumulatingSnapshotFactTable.ensure()
checks if the row it is given, already exists in the database table. If it does
not exist, it is immediately inserted. If it exists, the method will see if some
of the values for otherrefs
or measures
have been updated in the
passed row. If so, it will update the row in the database. Before that, it will,
however, run the factexpander()
if one was given to
AccumulatingSnapshotFactTable.__init__()
when the object was created.
Note that the generated SQL for lookups and updates will use the keyrefs
in the WHERE
clause and an index on them should be considered. An example of
how to use the class can be seen below:
import psycopg2
import pygrametl
from pygrametl.tables import AccumulatingSnapshotFactTable
# The actual database connection is handled by a PEP 249 connection
pgconn = psycopg2.connect("""host='localhost' dbname='dw' user='dwuser'
password='dwpass'""")
# A factexpander can be used to modify a row only if it has been updated, note
# that we only ignore namemapping for brevity, production code should use it
conn = pygrametl.ConnectionWrapper(connection=pgconn)
# A factexpander can be used to modify a row only if it has been updated, note
# that we only ignore namemapping for brevity, production code should use it
def computelag(row, namemapping, updated):
if 'shipmentdateid' in updated:
row['shipmentlag'] = row['shipmentdateid'] - row['paymentdateid']
if 'deliverydateid' in updated:
row['deliverylag'] = row['deliverydate'] - row['shipmentdateid']
# This instance of AccumulatingSnapshotFactTable connects to the table
# orderprocessing in the database using the connection created above
asft = AccumulatingSnapshotFactTable(
name='orderprocessing',
keyrefs=['orderid', 'customerid', 'productid'],
otherrefs=['paymentdateid', 'shipmentdateid', 'deliverydateid'],
measures=['price', 'shipmentlag', 'deliverylag'],
factexpander=computelag)
# A list of facts that are ready to inserted into the fact table
facts = [{'orderid': 1, 'customerid': 1, 'productid': 1, 'price': 10},
{'orderid': 2, 'customerid': 2, 'productid': 2, 'price': 20},
{'orderid': 3, 'customerid': 3, 'productid': 3, 'price': 30}]
# The facts can be inserted using the ensure method. (If we had used the
# insert method instead, we should have made sure the facts above had a
# value for each attribute in the fact table. When using ensure, missing
# attributes will be set to None before an insertion.)
for row in facts:
asft.ensure(row)
# Now assume that the the orders get paid and shipped
facts[0]['paymentdateid'] = 12
facts[0]['shipmentdateid'] = 14
facts[2]['paymentdateid'] = 11
# Update the accumulating fact table in the DW
for row in facts:
asft.ensure(row) # will call computelag and do the needed updates
conn.commit()
conn.close()