Data Sources

pygrametl supports numerous data sources, which are iterable classes that produce rows. A row is a Python dict where the keys are the names of the columns in the table where the row is from, and the values are the data stored in that row. Users can easily implement new data sources by implementing a version of the __iter__() method that returns dict. As data source are iterable, they can, e.g., be used in a loop as shown below:

for row in datasource:
    ...

While users can define their own data sources, pygrametl includes a number of commonly used data sources:

SQLSource

SQLSource is a data source used to iterate over the results of a single SQL query. The data source’s constructor must be passed a PEP 249 connection and not a ConnectionWrapper. As an example, a PostgreSQL connection created using the psycopg2 package is used below:

import psycopg2
from pygrametl.datasources import SQLSource

conn = psycopg2.connect(database='db', user='dbuser', password='dbpass')
sqlSource = SQLSource(connection=conn, query='SELECT * FROM table')

In the above example, an SQLSource is created in order to extract all rows from the table named table.

A tuple of strings can also optionally be supplied as the parameter names, to automatically rename the elements in the query results. Naturally, the number of supplied names must match the number of elements in the result:

import psycopg2
from pygrametl.datasources import SQLSource

conn = psycopg2.connect(database='db', user='dbuser', password='dbpass')
sqlSource = SQLSource(connection=conn, query='SELECT * FROM table',
                      names=('id', 'name', 'price'))

SQLSource also makes it possible to supply an SQL expression that will be executed before the query, through the initsql parameter. The result of the expression will not be returned. In the example below a new view is created and then used in the query:

import psycopg2
from pygrametl.datasources import SQLSource

conn = psycopg2.connect(database='db', user='dbuser', password='dbpass')
sqlSource = SQLSource(connection=conn, query='SELECT * FROM view',
    initsql='CREATE VIEW view AS SELECT id, name FROM table WHERE price > 10')

CSVSource

CSVSource is a data source that returns a row for each line in a character-separated file. It is an alias for Python’s csv.DictReader as it already is iterable and returns dict. An example of how to use CSVSource to read a file containing comma-separated values is shown below:

from pygrametl.datasources import CSVSource

# ResultsFile.csv contains: name,age,score
csvSource = CSVSource(f=open('ResultsFile.csv', 'r', 16384), delimiter=',')

In the above example, a CSVSource is initialized with a file handler that uses a buffer size of 16384, This particular buffer size is used as it performed better than the alternatives we evaluated it against.

TypedCSVSource

TypedCSVSource extends CSVSource with typecasting by wrapping csv.DictReader instead of simply being an alias.

from pygrametl.datasources import TypedCSVSource

# ResultsFile.csv contains: name,age,score
typedCSVSource = TypedCSVSource(f=open('ResultsFile.csv', 'r', 16384),
                                casts={'age': int, 'score': float},
                                delimiter=',')

In the above example, a TypedCSVSource is initialized with a file handler that uses a buffer size of 16384. This particular buffer size is used as it performed better than the alternatives we evaluated it against. A dictionary is also passed which provides information about what type each column should be cast to. A cast is not performed for the name column as TypedCSVSource uses str as the default.

PandasSource

PandasSource wraps a Pandas DataFrame so it can be used as a data source. The class reuses existing functionality provided by DataFrame. An example of how to use this class can be seen below. In this example data is loaded from a spreadsheet, then transformed using a Pandas DataFrame, and last converted to an iterable that produce dict for use with pygrametl:

import pandas
from pygrametl.datasources import PandasSource

df = pandas.read_excel('Revenue.xls')
df['price'] = df['price'].apply(lambda p: float(p) / 7.46)
pandasSource = PandasSource(df)

In the above example, a Pandas DataFrame is created from a spreadsheet containing revenue from some form of sales. Afterwards the data of the price column is transformed using one of the higher-order functions built into the Pandas package. Last, so the data can be loaded into a data warehouse using pygrametl, a PandasSource is created with the DataFrame as an argument, making the rows of the DataFrame accessible through a data source.

MergeJoiningSource

In addition to the above data sources which reads data from external sources, pygrametl also includes a number of data sources that take other data sources as input to transform and/or combine them.

MergeJoiningSource can be used to equijoin the rows from two data sources. The rows of the two data sources must be delivered in sorted order. The shared attributes on which the rows are to be joined must also be given.

from pygrametl.datasources import CSVSource, MergeJoiningSource

products = CSVSource(f=open('products.csv', 'r', 16384), delimiter=',')
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
mergeJoiningSource = MergeJoiningSource(src1=products, key1='productid',
                                        src2=sales, key2='productid')

In the above example, a MergeJoiningSource is used to join two data sources on their shared attribute productid.

HashJoiningSource

HashJoiningSource functions similarly to MergeJoiningSource, but it performs the join using a hash map. Thus the two input data sources need not produce their rows in sorted order.

from pygrametl.datasources import CSVSource, HashJoiningSource

products = CSVSource(f=open('products.csv', 'r', 16384), delimiter=',')
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
hashJoiningSource = HashJoiningSource(src1=products, key1='productid',
                                      src2=sales, key2='productid')

UnionSource

The class UnionSource creates a union of a number of the supplied data sources. UnionSource does not require that the input data sources all produce rows containing the same attributes, which also means that an UnionSource does not guarantee that all of the rows it produces contain the same attributes.

from pygrametl.datasources import CSVSource, UnionSource

salesOne = CSVSource(f=open('sales1.csv', 'r', 16384), delimiter='\t')
salesTwo = CSVSource(f=open('sales2.csv', 'r', 16384), delimiter='\t')
salesThree = CSVSource(f=open('sales3.csv', 'r', 16384), delimiter='\t')

combinedSales = UnionSource(salesOne, salesTwo, salesThree)

Each data source are exhausted before the next data source is read. This means that all rows are read from the first data source before any rows are read from the second data source, and so on.

RoundRobinSource

It can also be beneficial to interleave rows, and for this purpose, RoundRobinSource can be used.

from pygrametl.datasources import CSVSource, RoundRobinSource

salesOne = CSVSource(f=open('sales1.csv', 'r', 16384), delimiter='\t')
salesTwo = CSVSource(f=open('sales2.csv', 'r', 16384), delimiter='\t')
salesThree = CSVSource(f=open('sales3.csv', 'r', 16384), delimiter='\t')

combinedSales = RoundRobinSource((salesOne, salesTwo, salesThree),
                                 batchsize=500)

In the above example, RoundRobinSource is given a number of data sources, and the argument batchsize, which are the number of rows to be read from one data source before reading from the next in a round-robin fashion.

ProcessSource

ProcessSource is used for iterating over a data source using a separate worker process or thread. The worker reads data from the input data source and creates batches of rows. When a batch is complete, it is added to a queue so it can be consumed by another process or thread. If the queue is full the worker blocks until an element is removed from the queue. The sizes of the batches and the queue are optional parameters, but tuning them can often improve throughput. For more examples of the parallel features provided by pygrametl see Parallel.

from pygrametl.datasources import CSVSource, ProcessSource

sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
processSource = ProcessSource(source=sales, batchsize=1000, queuesize=20)

FilteringSource

FilteringSource is used to apply a filter to a data source. By default, the built-in Python function bool is used, which can be used to remove empty rows. Alternatively, the user can supply a custom filter function, which should be a callable function f(row), which returns True when a row should be passed on. In the example below, rows are removed if the value of their location attribute is not Aalborg.

from pygrametl.datasources import CSVSource, FilteringSource


def locationfilter(row):
    row['location'] == 'Aalborg'


sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
salesFiltered = FilteringSource(source=sales, filter=locationfilter)

MappingSource

MappingSource can be used to apply functions to the columns of a data source. It can be given a dictionary that where the keys are the columns and the values are callable functions of the form f(val). The functions will be applied to the attributes in an undefined order. In the example below, a function is used to cast all values for the attribute price to integers while rows are being read from a CSV file.

from pygrametl.datasources import CSVSource, MappingSource

sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter=',')
salesMapped = MappingSource(source=sales, callables={'price': int})

TransformingSource

TransformingSource can be used to apply functions to the rows of a data source. The class can be supplied with a number of callable functions of the form f(row), which will be applied to the source in the given order.

import pygrametl
from pygrametl.datasources import CSVSource, TransformingSource


def dkk_to_eur(row):
    price_as_a_number = int(row['price'])
    row['dkk'] = price_as_a_number
    row['eur'] = price_as_a_number / 7.43


sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter=',')
salesTransformed = TransformingSource(sales, dkk_to_eur)

In the above example, the price is converted from a string to an integer and stored in the row as two currencies.

SQLTransformingSource

SQLTransformingSource can be used to transform the rows of a data source using SQL. SQLTransformingSource loads the rows into a temporary table in an RDBMS and then retrieves them using an SQL query. By default each SQLTransformingSource uses a separate in-memory SQLite database but another database can be used by passing a PEP 249 connection or one of the ConnectionWrapper types as the parameter targetconnection. By using an on-disk database SQLTransformingSource can be used with datasets that do not fit in memory. If an existing database is used the rows from the data source can also be enriched using data from other tables in the database, e.g., by joining the rows with an existing table in the database. Be aware that SQLTransformingSource creates, empties, and drops the temporary table.

import pygrametl
from pygrametl.datasources import TypedCSVSource, SQLTransformingSource

sales = TypedCSVSource(f=open('sales.csv', 'r', 16384),
                       casts={'price': int}, delimiter=',')

salesTransformed = SQLTransformingSource(sales,
    "sales", "SELECT product, SUM(price) FROM sales GROUP BY product")

In the above example, the total revenue is computed for each product. First, a temporary in-memory SQLite database is created. Then a temporary table named sales with the same schema as the rows in sales is created. Finally, the rows in sales is loaded into the temporary table in batches and then the final result is produced by executing the provided SQL query. In addition, to the required parameters shown above, SQLTransformingSource also has multiple optional parameters, e.g., extendedcasts accepts a dict that specifies how Python types should be mapped to SQL types, perbatch specifies if the transformation should be applied for each batch of rows or for all rows in the input data source, and columnnames allows the columns in the output rows to be renamed.

CrossTabbingSource

CrossTabbingSource can be used to compute the cross tab of a data source. The class takes as parameters the names of the attributes that are to appear as rows and columns in the crosstab, as well as the name of the attribute to aggregate. By default, the values are aggregated using pygrametl.aggregators.Sum, but the class also accepts an alternate aggregator from the module pygrametl.aggregators.

from pygrametl.datasources import CSVSource, CrossTabbingSource, \
    TransformingSource
from pygrametl.aggregators import Avg


def dkk_to_eur(row):
    price_as_a_number = int(row['price'])
    row['dkk'] = price_as_a_number
    row['eur'] = price_as_a_number / 7.43


sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter=',')
salesTransformed = TransformingSource(sales, dkk_to_eur)

crossTab = CrossTabbingSource(source=salesTransformed, rowvaluesatt='product',
                              colvaluesatt='location', values='eur',
                              aggregator=Avg())

In the above example, a crosstab is made from a table containing sales data in order to view the average price of products across different locations. TransformingSource is used to parse and convert the price from DKK to EUR.

DynamicForEachSource

DynamicForEachSource is a data source that for each data source provided as input, creates a new data source that will be iterated by the DynamicForEachSource data source. To create the new data sources the user must provide a function that when called with a single argument, return a new data source. In the example below, DynamicForEachSource is used to create a CSVSource for each of the CSV files in a directory. The DynamicForEachSource stores the input list in a safe multiprocessing queue, and as such the DynamicForEachSource instance can be given to several ProcessSource. For information about pygrametl’s parallel features see Parallel.

import glob
from pygrametl.datasources import CSVSource, DynamicForEachSource


def createCSVSource(filename):
    return CSVSource(f=open(filename, 'r', 16384), delimiter=',')


salesFiles = glob.glob('sales/*.csv')
combinedSales = DynamicForEachSource(seq=salesFiles, callee=createCSVSource)