pygrametl - ETL programming in Python

pygrametl (pronounced py-gram-e-t-l) is a Python framework which offers commonly used functionality for development of Extract-Transform-Load (ETL) processes. It is open source released under a BSD license. It works both with CPython and Jython such that you also can use existing Java code and JDBC drivers in your ETL program.

When using pygrametl, the developer codes the ETL process in Python code. This turns out to be very efficient, also when compared to drawing the process in a graphical user interface (GUI).

Concretely, the developer creates an object for each dimension and fact table. (S)he can then easily add new members by dimension.insert(row) where row is a dict holding the values to insert. This is a very simple example, but pygrametl also supports much more complicated scenarios. For example, it is possible to create a single object for a snowflaked dimension. It is then still possible to add a new dimension member with a single method call as in snowflake.insert(row). This will automatically do the necessary lookups and insertions in the tables participating in the snowflake.

pygrametl also supports slowly changing dimensions. Again, the programmer only has to invoke a single method: scdim.scdensure(row). This will perform the needed updates of both type 1 (i.e., overwrites) and type 2 (i.e., addition of new versions).

pygrametl was first made publicly available in 2009. Since then, we have made multiple improvements and added new features. Version 2.5 was released in September 2016. Today, pygrametl is used in production systems in different sectors such as healthcare, finance, and transport.

Installation

You can install pygrametl from pypi with the following command.

$ pip install pygrametl

The currently developed version of pygrametl is available at GitHub.

$ git clone https://github.com/chrthomsen/pygrametl.git

For more information about installation see the Install Guide.

Example Program

Here you can see a complete example of a pygrametl program. The ETL program extracts data from two CSV files and joins their content before it is loaded into a data warehouse with the following schema.

Schema

The schema consists of a facttable, in the form of the testresults, and three dimensions test, time and page. The test and time dimensions are each represented as a single table, test and date. The page dimension is slowly changing and snowflaked into five tables, page, domain, topleveldomain, serverversion and server.

The code can be seen below and has a few functions defined in the top. After the functions, the pygrametl Dimension, FactTable, and Source objects are created. Using these objects, the main method only requires 10 lines of code to load the data warehouse. Note how easy it is to fill the page dimension even though it is slowly changing and snowflaked.

# This is code for loading the data warehouse from the running example
# presented in C. Thomsen & T.B. Pedersen: "pygrametl: A Powerful Programming
# Framework for Extract--Transform--Load Programmers"
#
# It is made to be used with PostgreSQL and psycopg2 but you can
# modify it to work with another DBMS.


#
#  Copyright (c) 2009, 2010 Christian Thomsen ( chr @ cs . aau . dk )
#
#  This file is free software: you may copy, redistribute and/or modify it
#  under the terms of the GNU General Public License version 2
#  as published by the Free Software Foundation.
#
#  This file is distributed in the hope that it will be useful, but
#  WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  General Public License for more details.
#
#  You should have received a copy of the GNU General Public License
#  along with this program.  If not, see http://www.gnu.org/licenses.
#


import datetime
import sys
import time

# In this example, we use psycopg2. You can change it to another driver,
# but then the method pgcopybulkloader won't work as we use driver-specific
# code there.
# You can make another function or declare facttbl (see further below) to
# be a BatchFactTable such that you don't need special
# bulk loading methods.

import psycopg2

# Depending on your system, you might have to do something like this
# where you append the path where pygrametl is installed
sys.path.append('/home/me/code')

import pygrametl
from pygrametl.datasources import CSVSource, MergeJoiningSource
from pygrametl.tables import CachedDimension, SnowflakedDimension,\
    SlowlyChangingDimension, BulkFactTable


# Connection to the target data warehouse:
pgconn = psycopg2.connect(user='me')
connection = pygrametl.ConnectionWrapper(pgconn)
connection.setasdefault()
connection.execute('set search_path to pygrametlexa')


# Methods
def pgcopybulkloader(name, atts, fieldsep, rowsep, nullval, filehandle):
    # Here we use driver-specific code to get fast bulk loading.
    # You can change this method if you use another driver or you can
    # use the FactTable or BatchFactTable classes (which don't require
    # use of driver-specifc code) instead of the BulkFactTable class.
    global connection
    curs = connection.cursor()
    curs.copy_from(file=filehandle, table=name, sep=fieldsep,
                   null=str(nullval), columns=atts)

def datehandling(row, namemapping):
    # This method is called from ensure(row) when the lookup of a date fails.
    # In the Real World, you would probably prefill the date dimension, but
    # we use this to illustrate "rowexpanders" that make it possible to
    # calculate derived attributes on demand (such that the - possibly
    # expensive - calculations only are done when needed and not for each
    # seen data row).
    #
    # Here, we calculate all date related fields and add them to the row.
    date = pygrametl.getvalue(row, 'date', namemapping)
    (year, month, day, hour, minute, second, weekday, dayinyear, dst) = \
        time.strptime(date, "%Y-%m-%d")
    (isoyear, isoweek, isoweekday) = \
        datetime.date(year, month, day).isocalendar()
    row['day'] = day
    row['month'] = month
    row['year'] = year
    row['week'] = isoweek
    row['weekyear'] = isoyear
    row['dateid'] = dayinyear + 366 * (year - 1990) #Support dates from 1990
    return row


def extractdomaininfo(row):
    # Take the 'www.domain.org' part from 'http://www.domain.org/page.html'
    # We also the host name ('www') in the domain in this example.
    domaininfo = row['url'].split('/')[-2]
    row['domain'] = domaininfo
    # Take the top level which is the last part of the domain
    row['topleveldomain'] = domaininfo.split('.')[-1]

def extractserverinfo(row):
    # Find the server name from a string like "ServerName/Version"
    row['server'] = row['serverversion'].split('/')[0]

# Dimension and fact table objects
topleveldim = CachedDimension(
    name='topleveldomain',
    key='topleveldomainid',
    attributes=['topleveldomain'])

domaindim = CachedDimension(
    name='domain',
    key='domainid',
    attributes=['domain', 'topleveldomainid'],
    lookupatts=['domain'])

serverdim = CachedDimension(
    name='server',
    key='serverid',
    attributes=['server'])

serverversiondim = CachedDimension(
    name='serverversion',
    key='serverversionid',
    attributes=['serverversion', 'serverid'])

pagedim = SlowlyChangingDimension(
    name='page',
    key='pageid',
    attributes=['url', 'size', 'validfrom', 'validto', 'version',
                'domainid', 'serverversionid'],
    lookupatts=['url'],
    versionatt='version',
    fromatt='validfrom',
    toatt='validto',
    srcdateatt='lastmoddate',
    cachesize=-1)

pagesf = SnowflakedDimension(
    [(pagedim, (serverversiondim, domaindim)),
     (serverversiondim, serverdim),
     (domaindim, topleveldim)
     ])

testdim = CachedDimension(
    name='test',
    key='testid',
    attributes=['testname', 'testauthor'],
    lookupatts=['testname'],
    prefill=True,
    defaultidvalue=-1)

datedim = CachedDimension(
    name='date',
    key='dateid',
    attributes=['date', 'day', 'month', 'year', 'week', 'weekyear'],
    lookupatts=['date'],
    rowexpander=datehandling)

facttbl = BulkFactTable(
    name='testresults',
    keyrefs=['pageid', 'testid', 'dateid'],
    measures=['errors'],
    bulkloader=pgcopybulkloader,
    bulksize=5000000)


# Data sources - change the path if you have your files somewhere else
# The buffer size is set to 16384 B, as it performed better than any alternatives we tested
downloadlog = CSVSource(file('./DownloadLog.csv', 'r', 16384),
                        delimiter='\t')

testresults = CSVSource(file('./TestResults.csv', 'r', 16384),
                        delimiter='\t')

inputdata = MergeJoiningSource(downloadlog, 'localfile',
                               testresults, 'localfile')

def main():
    for row in inputdata:
        extractdomaininfo(row)
        extractserverinfo(row)
        row['size'] = pygrametl.getint(row['size']) # Convert to an int
        # Add the data to the dimension tables and the fact table
        row['pageid'] = pagesf.scdensure(row)
        row['dateid'] = datedim.ensure(row, {'date':'downloaddate'})
        row['testid'] = testdim.lookup(row, {'testname':'test'})
        facttbl.insert(row)
    connection.commit()

if __name__ == '__main__':
    main()

Documentation

The documentation, including installation and beginner guides, are available online here.

In addition to the documentation multiple papers have been published on pygrametl. The papers provide a more detailed description of the foundational ideas behind pygrametl, but is obviously not keep up to date with changes and improvements implemented in the framework, for such see the documentation.

If you use pygrametl in academia, please cite the relevant paper(s) above.

Community

To keep the development of pygrametl open for external participation, we endeavour to keep all communication public through mailing lists and use of Github. Feel free to ask questions and provide all kinds of feedback, especially if you choose not to use pygrametl.

We encourage use of Github and the mailing lists. For discussions not suitable for a public mailing list, you can, however, send us a private email.