Data Sources¶
pygrametl supports numerous data sources, which are iterable classes that
produce rows. A row is a Python dict
where the keys are the names of
the columns in the table where the row is from, and the values are the data
stored in that row. Users can easily implement new data sources by implementing
a version of the __iter__()
method that returns dict
. As data
source are iterable, they can, e.g., be used in a loop as shown below:
for row in datasource:
...
While users can define their own data sources, pygrametl includes a number of commonly used data sources:
SQLSource¶
SQLSource
is a data source used to iterate over the results of a
single SQL query. The data source’s constructor must be passed a PEP 249
connection and not a ConnectionWrapper
. As an example, a PostgreSQL
connection created using the psycopg2 package is used below:
import psycopg2
from pygrametl.datasources import SQLSource
conn = psycopg2.connect(database='db', user='dbuser', password='dbpass')
sqlSource = SQLSource(connection=conn, query='SELECT * FROM table')
In the above example, an SQLSource
is created in order to extract all
rows from the table named table.
A tuple of strings can also optionally be supplied as the parameter names
, to
automatically rename the elements in the query results. Naturally, the number of
supplied names must match the number of elements in the result:
import psycopg2
from pygrametl.datasources import SQLSource
conn = psycopg2.connect(database='db', user='dbuser', password='dbpass')
sqlSource = SQLSource(connection=conn, query='SELECT * FROM table',
names=('id', 'name', 'price'))
SQLSource
also makes it possible to supply an SQL expression that will
be executed before the query, through the initsql
parameter. The result
of the expression will not be returned. In the example below a new view is
created and then used in the query:
import psycopg2
from pygrametl.datasources import SQLSource
conn = psycopg2.connect(database='db', user='dbuser', password='dbpass')
sqlSource = SQLSource(connection=conn, query='SELECT * FROM view',
initsql='CREATE VIEW view AS SELECT id, name FROM table WHERE price > 10')
CSVSource¶
CSVSource
is a data source that returns a row for each line in a
character-separated file. It is an alias for Python’s csv.DictReader as it already is
iterable and returns dict
. An example of how to use
CSVSource
to read a file containing comma-separated values is shown
below:
from pygrametl.datasources import CSVSource
# ResultsFile.csv contains: name,age,score
csvSource = CSVSource(f=open('ResultsFile.csv', 'r', 16384), delimiter=',')
In the above example, a CSVSource
is initialized with a file handler
that uses a buffer size of 16384, This particular buffer size is used as it
performed better than the alternatives we evaluated it against.
TypedCSVSource¶
TypedCSVSource
extends CSVSource
with typecasting by
wrapping csv.DictReader instead of simply
being an alias.
from pygrametl.datasources import TypedCSVSource
# ResultsFile.csv contains: name,age,score
typedCSVSource = TypedCSVSource(f=open('ResultsFile.csv', 'r', 16384),
casts={'age': int, 'score': float},
delimiter=',')
In the above example, a TypedCSVSource
is initialized with a file
handler that uses a buffer size of 16384. This particular buffer size is used as
it performed better than the alternatives we evaluated it against. A dictionary
is also passed which provides information about what type each column should be
cast to. A cast is not performed for the name column as TypedCSVSource
uses str
as the default.
PandasSource¶
PandasSource
wraps a Pandas DataFrame so it can be used as a data
source. The class reuses existing functionality provided by DataFrame. An
example of how to use this class can be seen below. In this example data is
loaded from a spreadsheet, then transformed using a Pandas DataFrame, and last
converted to an iterable that produce dict
for use with pygrametl:
import pandas
from pygrametl.datasources import PandasSource
df = pandas.read_excel('Revenue.xls')
df['price'] = df['price'].apply(lambda p: float(p) / 7.46)
pandasSource = PandasSource(df)
In the above example, a Pandas DataFrame is created from a spreadsheet
containing revenue from some form of sales. Afterwards the data of the price
column is transformed using one of the higher-order functions built into the
Pandas package. Last, so the data can be loaded into a data warehouse using
pygrametl, a PandasSource
is created with the DataFrame as an
argument, making the rows of the DataFrame accessible through a data source.
MergeJoiningSource¶
In addition to the above data sources which reads data from external sources, pygrametl also includes a number of data sources that take other data sources as input to transform and/or combine them.
MergeJoiningSource
can be used to equijoin the rows from two data
sources. The rows of the two data sources must be delivered in sorted order. The
shared attributes on which the rows are to be joined must also be given.
from pygrametl.datasources import CSVSource, MergeJoiningSource
products = CSVSource(f=open('products.csv', 'r', 16384), delimiter=',')
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
mergeJoiningSource = MergeJoiningSource(src1=products, key1='productid',
src2=sales, key2='productid')
In the above example, a MergeJoiningSource
is used to join two data
sources on their shared attribute productid.
HashJoiningSource¶
HashJoiningSource
functions similarly to MergeJoiningSource
,
but it performs the join using a hash map. Thus the two input data sources need
not produce their rows in sorted order.
from pygrametl.datasources import CSVSource, HashJoiningSource
products = CSVSource(f=open('products.csv', 'r', 16384), delimiter=',')
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
hashJoiningSource = HashJoiningSource(src1=products, key1='productid',
src2=sales, key2='productid')
UnionSource¶
The class UnionSource
creates a union of a number of the supplied data
sources. UnionSource
does not require that the input data sources all
produce rows containing the same attributes, which also means that an
UnionSource
does not guarantee that all of the rows it produces
contain the same attributes.
from pygrametl.datasources import CSVSource, UnionSource
salesOne = CSVSource(f=open('sales1.csv', 'r', 16384), delimiter='\t')
salesTwo = CSVSource(f=open('sales2.csv', 'r', 16384), delimiter='\t')
salesThree = CSVSource(f=open('sales3.csv', 'r', 16384), delimiter='\t')
combinedSales = UnionSource(salesOne, salesTwo, salesThree)
Each data source are exhausted before the next data source is read. This means that all rows are read from the first data source before any rows are read from the second data source, and so on.
RoundRobinSource¶
It can also be beneficial to interleave rows, and for this purpose,
RoundRobinSource
can be used.
from pygrametl.datasources import CSVSource, RoundRobinSource
salesOne = CSVSource(f=open('sales1.csv', 'r', 16384), delimiter='\t')
salesTwo = CSVSource(f=open('sales2.csv', 'r', 16384), delimiter='\t')
salesThree = CSVSource(f=open('sales3.csv', 'r', 16384), delimiter='\t')
combinedSales = RoundRobinSource((salesOne, salesTwo, salesThree),
batchsize=500)
In the above example, RoundRobinSource
is given a number of data
sources, and the argument batchsize
, which are the number of rows to be
read from one data source before reading from the next in a round-robin fashion.
ProcessSource¶
ProcessSource
is used for iterating over a data source using a
separate worker process or thread. The worker reads data from the input data
source and creates batches of rows. When a batch is complete, it is added to a
queue so it can be consumed by another process or thread. If the queue is full
the worker blocks until an element is removed from the queue. The sizes of the
batches and the queue are optional parameters, but tuning them can often improve
throughput. For more examples of the parallel features provided by pygrametl see
Parallel.
from pygrametl.datasources import CSVSource, ProcessSource
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
processSource = ProcessSource(source=sales, batchsize=1000, queuesize=20)
FilteringSource¶
FilteringSource
is used to apply a filter to a data source. By
default, the built-in Python function bool is used, which can be
used to remove empty rows. Alternatively, the user can supply a custom filter
function, which should be a callable function f(row)
, which returns
True
when a row should be passed on. In the example below, rows are
removed if the value of their location attribute is not Aalborg.
from pygrametl.datasources import CSVSource, FilteringSource
def locationfilter(row):
row['location'] == 'Aalborg'
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter='\t')
salesFiltered = FilteringSource(source=sales, filter=locationfilter)
MappingSource¶
MappingSource
can be used to apply functions to the columns of a data
source. It can be given a dictionary that where the keys are the columns and the
values are callable functions of the form f(val)
. The functions will be
applied to the attributes in an undefined order. In the example below, a
function is used to cast all values for the attribute price to integers while
rows are being read from a CSV file.
from pygrametl.datasources import CSVSource, MappingSource
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter=',')
salesMapped = MappingSource(source=sales, callables={'price': int})
TransformingSource¶
TransformingSource
can be used to apply functions to the rows of a
data source. The class can be supplied with a number of callable functions of
the form f(row)
, which will be applied to the source in the given order.
import pygrametl
from pygrametl.datasources import CSVSource, TransformingSource
def dkk_to_eur(row):
price_as_a_number = int(row['price'])
row['dkk'] = price_as_a_number
row['eur'] = price_as_a_number / 7.43
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter=',')
salesTransformed = TransformingSource(sales, dkk_to_eur)
In the above example, the price is converted from a string to an integer and stored in the row as two currencies.
SQLTransformingSource¶
SQLTransformingSource
can be used to transform the rows of a data
source using SQL. SQLTransformingSource
loads the rows into a
temporary table in an RDBMS and then retrieves them using an SQL query. By
default each SQLTransformingSource
uses a separate in-memory SQLite
database but another database can be used by passing a PEP 249 connection or
one of the ConnectionWrapper
types as the parameter
targetconnection
. By using an on-disk database
SQLTransformingSource
can be used with datasets that do not fit in
memory. If an existing database is used the rows from the data source can also
be enriched using data from other tables in the database, e.g., by joining the
rows with an existing table in the database. Be aware that
SQLTransformingSource
creates, empties, and drops the temporary table.
import pygrametl
from pygrametl.datasources import TypedCSVSource, SQLTransformingSource
sales = TypedCSVSource(f=open('sales.csv', 'r', 16384),
casts={'price': int}, delimiter=',')
salesTransformed = SQLTransformingSource(sales,
"sales", "SELECT product, SUM(price) FROM sales GROUP BY product")
In the above example, the total revenue is computed for each product. First, a
temporary in-memory SQLite database is created. Then a temporary table named
sales with the same schema as the rows in sales
is created. Finally, the
rows in sales
is loaded into the temporary table in batches and then the
final result is produced by executing the provided SQL query. In addition, to
the required parameters shown above, SQLTransformingSource
also has
multiple optional parameters, e.g., extendedcasts
accepts a
dict
that specifies how Python types should be mapped to SQL types,
perbatch
specifies if the transformation should be applied for each
batch of rows or for all rows in the input data source, and columnnames
allows the columns in the output rows to be renamed.
CrossTabbingSource¶
CrossTabbingSource
can be used to compute the cross tab of a data
source. The class takes as parameters the names of the attributes that are to
appear as rows and columns in the crosstab, as well as the name of the attribute
to aggregate. By default, the values are aggregated using
pygrametl.aggregators.Sum
, but the class also accepts an alternate
aggregator from the module pygrametl.aggregators
.
from pygrametl.datasources import CSVSource, CrossTabbingSource, \
TransformingSource
from pygrametl.aggregators import Avg
def dkk_to_eur(row):
price_as_a_number = int(row['price'])
row['dkk'] = price_as_a_number
row['eur'] = price_as_a_number / 7.43
sales = CSVSource(f=open('sales.csv', 'r', 16384), delimiter=',')
salesTransformed = TransformingSource(sales, dkk_to_eur)
crossTab = CrossTabbingSource(source=salesTransformed, rowvaluesatt='product',
colvaluesatt='location', values='eur',
aggregator=Avg())
In the above example, a crosstab is made from a table containing sales data in
order to view the average price of products across different locations.
TransformingSource
is used to parse and convert the price from DKK to EUR.
DynamicForEachSource¶
DynamicForEachSource
is a data source that for each data source
provided as input, creates a new data source that will be iterated by the
DynamicForEachSource
data source. To create the new data sources the
user must provide a function that when called with a single argument, return a
new data source. In the example below, DynamicForEachSource
is used to
create a CSVSource
for each of the CSV files in a directory. The
DynamicForEachSource
stores the input list in a safe multiprocessing
queue, and as such the DynamicForEachSource
instance can be given to
several ProcessSource
. For information about pygrametl’s parallel
features see Parallel.
import glob
from pygrametl.datasources import CSVSource, DynamicForEachSource
def createCSVSource(filename):
return CSVSource(f=open(filename, 'r', 16384), delimiter=',')
salesFiles = glob.glob('sales/*.csv')
combinedSales = DynamicForEachSource(seq=salesFiles, callee=createCSVSource)