Parallel¶
pygrametl provides multiple abstractions to simplify the creation of parallel
ETL flow, to take advantage of modern multi-core and multi-processor systems.
Firstly, any datasources
can be read in a separate process using
ProcessSource
. Further parallelism can be archived by decoupling
tables from the main process and allowing these decoupled tables to communicate
with each other without interrupting the main process. Tables can also be
partitioned so operations on large tables can be performed by multiple
processes. Both decoupled tables and partitioning tables can be found in the
tables
module. To support database connections from multiple decoupled
tables any ConnectionWrapper
and JDBCConnectionWrapper
must
be wrapped by the function shareconnectionwrapper()
before being used by
multiple decoupled tables.
pygrametl also provides abstractions for running functions in parallel. The
decorator splitpoint()
can be used to annotate functions that should run
in separate processes. This supplements the decoupled tables, as many
transformations are done in a set of functions before they are inserted into a
database table. Splitpoints can be synchronized using the function
endsplits()
. The function createflow()
can be used to create a
sequence of functions that run in separate processes. In a flow a row is first
given to the first function, then the second, and so forth. This also means the
passed row must be modified as the functions return values are ignored.
Note
pygrametl supports executing parallel ETL flows using CPython (only on
platforms that start new processes using fork)
or Jython. The method used by CPython to start a
process can be determined using multiprocessing.get_start_method()
.
Unix-like operating systems generally use fork by default, but some must be
configured to use fork through multiprocessing.set_start_method()
.
Microsoft Windows does not support fork, thus a Unix-like environment must
be installed, e.g., using Windows Subsystem for Linux.
Due to CPython’s GIL, Jython should be used to run ETL flows that use pygrametl parallel constructs. This is because Jython allows threads to be used for parallel processing, while it is necessary to use processes in CPython. Thus the term process is used to denote a process or a thread, depending on the Python implementation in question. For more information on using pygrametl on Jython, see Jython.
ProcessSource¶
ProcessSource
is a data source that allows other data sources to be
iterated through in a separate process. A data source in pygrametl is a set of
abstraction that provides access to multiple types of data through a normal
Python iterator. For more information about data sources see Data Sources.
from pygrametl.tables import FactTable, CachedDimension
from pygrametl.datasources import CSVSource, ProcessSource, \
TransformingSource
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")
conn = JDBCConnectionWrapper(jdbcconn=jconn)
factTable = FactTable(
name='facttable',
measures=['sale'],
keyrefs=['storeid', 'productid', 'dateid'])
productTable = CachedDimension(
name='product',
key='productid',
attributes=['name', 'price'],
lookupatts=['name'])
# A set of computational expensive functions are needed to transform the
# facts before they can be inserted into the fact table. Each function must
# be defined as func(row) so a TransformationSource can combine them before
# they are passed to ProcessSource and run in another thread
def convertReals(row):
# Converting a string encoding of a float to an integer must be done in
# two steps, first it must be converted to a float and then to an integer
row['sale'] = int(float(row['sale']))
def trimProductname(row):
row['name'] = row['name'].strip()
# In the transformation we use three data sources to retrieve rows from
# sales.csv, first CSVSource to read the csv file, then
# TransformationSource to transform the rows, and lastly ProcessSource to
# do both the reading and transformation in another thread
sales = CSVSource(f=open('sales.csv'), delimiter=',')
transSales = TransformingSource(sales, convertReals, trimProductname)
salesProcess = ProcessSource(transSales)
# While the list of sales are being read and transformed by the spawned
# thread, the main thread is occupied with pre-loading the product dimension
# with data from product.csv
products = CSVSource(f=open('product.csv'), delimiter=',')
for row in products:
productTable.insert(row)
# After the ProcessSource have read rows from the data source provided, they
# can be accessed through ProcessSource iterator like any other data source
for row in salesProcess:
row['productid'] = productTable.lookup(row)
factTable.insert(row)
conn.commit()
conn.close()
In the above example, we use a ProcessSource
to transform a set of rows
from sales.csv while we fill the product dimension with data. As the use of a
ProcessSource
adds additional overhead to the iterator, seeing as rows
must be transferred in batches from another process, other computations should
be performed in between the creation and use of the data source to allow for
data to be read, transformed, and transferred.
Decoupled Tables¶
A decoupled table in pygrametl is a proxy for an instance of another table class
defined in the tables
module. Currently, two different classes exist for
decoupled tables, DecoupledDimension
and DecoupledFactTable
.
The two classes behave nearly identically with one implementing the interface of
a dimension and the other the interface of a fact table. When a method is called
on one of the two classes, a message is sent to the actual table object, and if
the method has a return value an instance of the class FutureResult
is
returned. This instance is a handle to the actual result when it becomes
available. To get the actual result, the instance can be given directly to a
method accepting a row which would force the method to block until a value is
ready, or the entire decoupled can be consumed by another decoupled table. When
a decoupled table is consumed by another decoupled table, the values are
extracted from an instance of FutureResult
by the table that needs it
without blocking the caller of methods on that table. It should however be noted
that any rows passed to an instance of DecoupledFactTable
or
DecoupledDimension
should only contain the attributes directly needed
by the table, as having additional key/value pairs in the dict
can
make pygrametl insert the row before the actual values are ready, leading to
instances of the class FutureResult
being incorrectly passed to the
database instead.
from pygrametl.datasources import CSVSource
from pygrametl.tables import FactTable, CachedDimension,\
DecoupledDimension, DecoupledFactTable
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
from pygrametl.parallel import shareconnectionwrapper
# The data is read from a csv file
inputdata = CSVSource(f=open('sales.csv', 'r'), delimiter=',')
# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")
# The connection wrapper is itself wrapped in a SharedConnectionClient,
# so it can be shared by multiple decoupled tables in a safe manner
conn = JDBCConnectionWrapper(jdbcconn=jconn)
shrdconn = shareconnectionwrapper(targetconnection=conn)
# The product dimension is decoupled and runs in a separate thread allowing
# it to be accessed by other decoupled tables without using the main thread
productDimension = DecoupledDimension(
CachedDimension(
name='product',
key='productid',
attributes=['name', 'price'],
lookupatts=['name'],
# The SharedConnectionWrapperClient must be copied for each
# decoupled table that use it correct interaction with the database
targetconnection=shrdconn.copy(),
prefill=True)
)
# The fact table is also decoupled in order to consume the values returned
# from the methods called on the product dimension without blocking the main
# thread while waiting for the database. Thus allowing the main thread to
# perform other operations needed before a full fact is ready
factTable = DecoupledFactTable(
FactTable(
name='facttable',
measures=['sale'],
keyrefs=['storeid', 'productid', 'dateid'],
targetconnection=shrdconn.copy()),
returnvalues=False,
consumes=[productDimension]
)
# Inserting facts into the database can be done in the same manner as in a
# sequential ETL flow, extraction of data from the product dimension is
# done automatically by pygrametl
for row in inputdata:
# A new row is created for each fact, as having values not present in a
# decoupled table that consumes another dimension, can make pygrametl
# miscalculate when the actual results are ready, making the framework
# pass a FutureResult to the database which usually raises an error
fact = {}
fact['storeid'] = row['storeid']
fact['productid'] = productDimension.ensure(row)
fact['dateid'] = row['dateid']
fact['sale'] = row['sale']
# Other CPU intensive transformations should be performed to take
# advantage of the decoupled dimensions automatically exchanging data
factTable.insert(fact)
shrdconn.commit()
shrdconn.close()
The above example shows a very simple use of decoupled tables in pygrametl, for
real-world application, tuning of queues and buffers should be done to match the
underlying hardware to maximize the performance of the parallel ETL flow.
Although the example uses an instance of Dimension
and
FactTable
for simplicity, it is supported for all types of dimensions
and fact tables, except SubprocessFactTable
on CPython as it already
runs in its own process. Decoupling of tables requiring a large amount of
processing when their methods are called, like a SnowflakedDimension
,
can help increase performance due to not blocking the main process while waiting
on the database performing the joins.
If any user-defined function needs to access the database and be synchronized
with the decoupled tables, it must be passed to shareconnectionwrapper()
.
An example of such a function is the bulk loader used for pygrametl’s
BulkFactTable
.
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
from pygrametl.parallel import shareconnectionwrapper
# JDBC and Jython is used as threads usually provides better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")
# A user-defined function that can bulk load data into PostgreSQL over JDBC
def bulkloader(name, attributes, fieldsep, rowsep, nullval, filehandle):
global jconn
copymgr = jconn.getCopyAPI()
sql = "COPY %s(%s) FROM STDIN WITH DELIMITER '%s'" % \
(name, ', '.join(attributes), fieldsep)
copymgr.copyIn(sql, filehandle)
# The connection wrapper is itself wrapped in a SharedConnectionClient so it
# can be shared by multiple decoupled tables in a safe manner. The function
# bulkloader is given to shareconnectionwrapper so the shared connection
# wrapper can ensure that the bulk loading function is synchronized with
# the decoupled tables using the shared connection wrapper
conn = JDBCConnectionWrapper(jdbcconn=jconn)
scw = shareconnectionwrapper(targetconnection=conn, userfuncs=[bulkloader])
Partitioning Tables¶
If a particular dimension or fact table requires more processing than the other
tables, it can be beneficial to partition it into multiple partitions. Thus
allowing operations to be conducted on one table in parallel to reduce the time
needed to process that particular table. pygrametl supports partitioning of
tables through multiple features. Firstly, the classes
DimensionPartitioner
and FactTablePartitioner
automates the
partitioning of rows for multiple decoupled dimensions or fact tables. How to do
the partitioning is determined by a partitioning function with the signature
func(dict)()
. If no function is passed, then a default partitioning
function is used as documented in the API. Secondly, to ensure that unique
surrogate keys are assigned to all rows in a partitioned table, a shared
sequence factory can be created using the function
getsharedsequencefactory()
. Each parallel process is then given a unique
set of numbers to use as surrogate keys, ensuring that all surrogate keys are
unique despite being assigned by separate processes.
from pygrametl.datasources import CSVSource
from pygrametl.tables import FactTable, CachedDimension, \
DecoupledDimension, DecoupledFactTable, DimensionPartitioner
from pygrametl.parallel import shareconnectionwrapper, \
getsharedsequencefactory
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
sales = CSVSource(f=open('sales.csv', 'r'), delimiter=',')
# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")
# The connection wrapper is itself wrapped in a SharedConnectionClient,
# so it can be shared by multiple decoupled tables in a safe manner
conn = JDBCConnectionWrapper(jdbcconn=jconn)
shrdconn = shareconnectionwrapper(targetconnection=conn)
# A sharedsequencefactory is created which provides values starting at zero.
# It gives each table a sequence of numbers to use as surrogate keys. The
# size of the sequence can be increased through a second argument if the
# sharedsequencefactory becomes a bottleneck in the ETL flow
idfactory = getsharedsequencefactory(0)
# The product dimension must use the sharedsequencefactory to ensure that
# the two processes do not assign overlapping surrogate key to the rows
productDimensionOne = DecoupledDimension(
CachedDimension(
name='product',
key='productid',
attributes=['name', 'price'],
lookupatts=['name'],
idfinder=idfactory(),
targetconnection=shrdconn.copy(),
prefill=True)
)
productDimensionTwo = DecoupledDimension(
CachedDimension(
name='product',
key='productid',
attributes=['name', 'price'],
lookupatts=['name'],
idfinder=idfactory(),
targetconnection=shrdconn.copy(),
prefill=True)
)
# The partitioning of data is automated by the DimensionPartitioner using
# a hash on the name of product. A FactTablePartitioner is also provided
productDimension = DimensionPartitioner(
parts=[productDimensionOne, productDimensionTwo],
partitioner=lambda row: hash(row['name']))
# Only partitioned tables needs to use the sharedsequencefactory, normal tables
# can without any problems use the default self-incrementing surrogate key
factTable = DecoupledFactTable(
FactTable(
name='facttable',
measures=['sale'],
keyrefs=['storeid', 'productid', 'dateid'],
targetconnection=shrdconn.copy()),
returnvalues=False,
# When consuming a partitioned dimension each part should be
# consumed separately, a simple way to do so is using the parts
# method which returns all parts managed by the partitioner
consumes=productDimension.parts
)
# A partitioned table can be used in the same way as any other pygrametl
# table since the framework takes care of the partitioning behind the scenes
for row in sales:
# A new row is created for each fact, as having values not present in a
# decoupled table that consumes another dimension, can make pygrametl
# miscalculate when the actual results are ready, making the framework
# pass a FutureResult to the database which usually raises an error
fact = {}
fact['storeid'] = row['storeid']
fact['dateid'] = row['dateid']
fact['productid'] = productDimension.ensure(row)
fact['sale'] = row['sale']
# Other CPU intensive transformations should be performed to take
# advantage of the decoupled dimensions automatically exchanging data
factTable.insert(fact)
shrdconn.commit()
shrdconn.close()
The above example shows how to partition the data of the product dimension to
multiple decoupled tables. This allows operations on the dimension to be
performed by two different processes. The rows are partitioned using hash
partitioning on the attribute name
. A shared sequence factory is used to
provide surrogate keys for the product dimension, as using a self-incrementing
key would assign the same value to multiple rows. This is not needed for the
fact table as only one table handles all operations on the fact table in the
database, so a simple self-incrementing key is fine.
Splitpoints¶
As CPU intensive operations are often performed in user-defined functions, the
decorator splitpoint()
is provided. This decorator functions in much the
same way as decoupled classes do for tables, as a number of processes are
spawned to run the function. The number of processes to spawn can be passed to
the decorator, allowing more processes to be created for functions with a longer
run time. The first time a function with a decorator is called, a process is
created to handle the call. This is done until the number of created processes
matches the argument given to the decorator. Then, if a process is not available,
the call and its arguments are added to a queue
shared by the process
created for the splitpoint. If a split function calls another function that
requires synchronization it can be annotated with a new splitpoint with one as
the argument, specifying that only one process is allowed to call this function
at a time. To ensure all annotated functions are finished, the function
endsplits()
must be called, which joins all processes created by split
points up to that point.
from pygrametl.tables import FactTable
from pygrametl.datasources import CSVSource
from pygrametl.parallel import splitpoint, endsplits
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
sales = CSVSource(f=open('sales.csv', 'r'), delimiter=',')
# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")
conn = JDBCConnectionWrapper(jdbcconn=jconn)
factTable = FactTable(
name='facttable',
measures=['sale'],
keyrefs=['storeid', 'productid', 'dateid']
)
# Five threads are created to run this function, so five rows can be
# transformed at the same time. If no threads are available, the row
# is added to a queue and transformed when a thread becomes idle
@splitpoint(instances=5)
def performExpensiveTransformations(row):
# Do some (expensive) transformations...
# As multiple threads perform the operation inside this function. a second
# function must be created to synchronize inserting rows into the database
insertRowIntoData(row)
# The function is annotated with an argument-free splitpoint, so its argument
# becomes one, thereby specifying that this function should run in one thread
@splitpoint
def insertRowIntoData(row):
factTable.insert(row)
# The CSV file is read by the main thread, then each row is transformed by
# one of five threads, before being added to the database by a sixth thread
for row in sales:
performExpensiveTransformations(row)
# To ensure that all splitpoint annotated functions are finished before
# the ETL flow is terminated, the function endsplits must be called as it
# joins all the threads created by splitpoints up to this point
endsplits()
conn.commit()
conn.close()
The above example shows how to use splitpoints. Here, a very computationally
expensive function is annotated with a splitpoint
which is given the
argument five, allowing five processes to run the function at the same time. The
second splitpoint
without an argument ensures that only one process is
allowed to execute that function at a time, so even though it is called from
performExpensiveTransformation()
only one process can insert rows into
the fact table at the same time. Should the operations on the fact table become
a bottleneck, it could be partitioned using FactTablePartitioner
. To
ensure that all splitpoints have finished execution, the function
endsplits()
is executed, which joins all splitpoints, before the database
connection is closed.
As splitpoint annotated functions run in separate processes, any values they
return are not available to the process calling them. To work around this
restriction a queue can be passed as an argument to splitpoint
in which
the split function’s returned values will be added.
from pygrametl.datasources import CSVSource
from pygrametl.parallel import splitpoint, endsplits
from pygrametl.jythonmultiprocessing import Queue
queue = Queue()
sales = CSVSource(f=open('sales.csv', 'r'), delimiter=',')
# A queue is passed to the decorator, which uses it to store return values
@splitpoint(instances=5, output=queue)
def expensiveReturningOperation(row):
# Some special value, in this case None, is used to indicate that no
# more data will be given to the queue and that processing can continue
if row is None:
return None
# Returned values are automatically added to the queue for other to use
return row
# Each row in the sales.csv is extracted and passed to the function
for row in sales:
expensiveReturningOperation(row)
# A simple sentinel value can be used to indicate that all rows have been
# processed and that the loop using the results below can break
expensiveReturningOperation(None)
# A infinite loop is used to process the returned values as the number of
# returned rows are unknown, so a sentinel value and a break is used instead
while True:
# Extracts the processed row returned by the annotated function, a
# simple sentinel value is used to indicate when the processing is done
elem = queue.get()
if elem is None:
break
# Use the returned elements after the sentinel check to prevent errors
# ......
# To ensure that all splitpoint annotated functions are finished before
# the ETL flow is terminated, the function endsplits must be called as it
# joins all the process created by splitpoints up to this point
endsplits()
Flows¶
Another way to parallelize transformations is to use flows. In pygrametl, a flow
is a sequence of functions with the same interface, each running in its own
separate process, and where each function calls the next function in the
sequence. A flow can be created from multiple different functions using the
createflow()
function. After a flow is created it can be called just like
any other function. Internally, the arguments are passed from the first function
to the last. While the arguments are passed to the functions, any returned
values are ignored. Unlike splitpoint()
, arguments are passed in batches
and not as single values to reduce the overhead of synchronization.
from pygrametl.tables import Dimension
from pygrametl.datasources import CSVSource
from pygrametl.parallel import splitpoint, endsplits, createflow
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
"jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")
conn = JDBCConnectionWrapper(jdbcconn=jconn)
products = CSVSource(f=open('product.csv', 'r'), delimiter=',')
productDimension = Dimension(
name='product',
key='productid',
attributes=['name', 'price'],
lookupatts=['name'])
# Two functions are defined to transform the information in product.csv
def normaliseProductNames(row):
# Expensive operations should be performed in a flow, this example is
# simple, so the performance gain is negated by the synchronization
row['name'].lower()
def convertPriceToThousands(row):
# Expensive operations should be performed in a flow, this example is
# simple, so the performance gain is negated by the synchronization
row['price'] = int(row['price']) / 1000
# A flow is created from the two functions defined above, this flow can then
# be called just like any other functions despite being parallelized
flow = createflow(normaliseProductNames, convertPriceToThousands)
# The data is read from product.csv in a splitpoint so the main process
# does not have to both read the input data and load it into the table
@splitpoint
def producer():
for row in products:
flow(row)
# The flow should be closed when there is no more data available,
# this means no more data is accepted but the computations will finish
flow.close()
# The producer is called and the separate process starts to read the input
producer()
# The simplest way to extract rows from a flow is just to iterate over it,
# however additional functions to get the results as a list are available
for row in flow:
productDimension.insert(row)
endsplits()
conn.commit()
A flow is used in the above example to combine multiple functions, each
transforming the rows from product.csv. By creating a flow with these functions,
a process is created for each to increase the ETL flows throughput. The overhead
of transferring data between the functions is reduced through batching. Rows
are provided to the flow in function producer()
, which runs in a separate
process using a splitpoint so the main process can load the transformed rows
into the database by iterating over the flow.