Parallel

pygrametl provides multiple abstractions to simplify the creation of parallel ETL flow, to take advantage of modern multi-core and multi-processor systems. Firstly, any datasources can be read in a separate process using ProcessSource. Further parallelism can be archived by decoupling tables from the main process and allowing these decoupled tables to communicate with each other without interrupting the main process. Tables can also be partitioned so operations on large tables can be performed by multiple processes. Both decoupled tables and partitioning tables can be found in the tables module. To support database connections from multiple decoupled tables any ConnectionWrapper and JDBCConnectionWrapper must be wrapped by the function shareconnectionwrapper() before being used by multiple decoupled tables.

pygrametl also provides abstractions for running functions in parallel. The decorator splitpoint() can be used to annotate functions that should run in separate processes. This supplements the decoupled tables, as many transformations are done in a set of functions before they are inserted into a database table. Splitpoints can be synchronized using the function endsplits(). The function createflow() can be used to create a sequence of functions that run in separate processes. In a flow a row is first given to the first function, then the second, and so forth. This also means the passed row must be modified as the functions return values are ignored.

Note

pygrametl supports executing parallel ETL flows using CPython (only on platforms that start new processes using fork) or Jython. The method used by CPython to start a process can be determined using multiprocessing.get_start_method(). Unix-like operating systems generally use fork by default, but some must be configured to use fork through multiprocessing.set_start_method(). Microsoft Windows does not support fork, thus a Unix-like environment must be installed, e.g., using Windows Subsystem for Linux.

Due to CPython’s GIL, Jython should be used to run ETL flows that use pygrametl parallel constructs. This is because Jython allows threads to be used for parallel processing, while it is necessary to use processes in CPython. Thus the term process is used to denote a process or a thread, depending on the Python implementation in question. For more information on using pygrametl on Jython, see Jython.

ProcessSource

ProcessSource is a data source that allows other data sources to be iterated through in a separate process. A data source in pygrametl is a set of abstraction that provides access to multiple types of data through a normal Python iterator. For more information about data sources see Data Sources.

from pygrametl.tables import FactTable, CachedDimension
from pygrametl.datasources import CSVSource, ProcessSource, \
        TransformingSource
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper

# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
    "jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")
conn = JDBCConnectionWrapper(jdbcconn=jconn)

factTable = FactTable(
    name='facttable',
    measures=['sale'],
    keyrefs=['storeid', 'productid', 'dateid'])

productTable = CachedDimension(
        name='product',
        key='productid',
        attributes=['name', 'price'],
        lookupatts=['name'])


# A set of computational expensive functions are needed to transform the
# facts before they can be inserted into the fact table. Each function must
# be defined as func(row) so a TransformationSource can combine them before
# they are passed to ProcessSource and run in another thread
def convertReals(row):
    # Converting a string encoding of a float to an integer must be done in
    # two steps, first it must be converted to a float and then to an integer
    row['sale'] = int(float(row['sale']))


def trimProductname(row):
    row['name'] = row['name'].strip()


# In the transformation we use three data sources to retrieve rows from
# sales.csv, first CSVSource to read the csv file, then
# TransformationSource to transform the rows, and lastly ProcessSource to
# do both the reading and transformation in another thread
sales = CSVSource(f=open('sales.csv'), delimiter=',')
transSales = TransformingSource(sales, convertReals, trimProductname)
salesProcess = ProcessSource(transSales)

# While the list of sales are being read and transformed by the spawned
# thread, the main thread is occupied with pre-loading the product dimension
# with data from product.csv
products = CSVSource(f=open('product.csv'), delimiter=',')
for row in products:
    productTable.insert(row)

# After the ProcessSource have read rows from the data source provided, they
# can be accessed through ProcessSource iterator like any other data source
for row in salesProcess:
    row['productid'] = productTable.lookup(row)
    factTable.insert(row)
conn.commit()
conn.close()

In the above example, we use a ProcessSource to transform a set of rows from sales.csv while we fill the product dimension with data. As the use of a ProcessSource adds additional overhead to the iterator, seeing as rows must be transferred in batches from another process, other computations should be performed in between the creation and use of the data source to allow for data to be read, transformed, and transferred.

Decoupled Tables

A decoupled table in pygrametl is a proxy for an instance of another table class defined in the tables module. Currently, two different classes exist for decoupled tables, DecoupledDimension and DecoupledFactTable. The two classes behave nearly identically with one implementing the interface of a dimension and the other the interface of a fact table. When a method is called on one of the two classes, a message is sent to the actual table object, and if the method has a return value an instance of the class FutureResult is returned. This instance is a handle to the actual result when it becomes available. To get the actual result, the instance can be given directly to a method accepting a row which would force the method to block until a value is ready, or the entire decoupled can be consumed by another decoupled table. When a decoupled table is consumed by another decoupled table, the values are extracted from an instance of FutureResult by the table that needs it without blocking the caller of methods on that table. It should however be noted that any rows passed to an instance of DecoupledFactTable or DecoupledDimension should only contain the attributes directly needed by the table, as having additional key/value pairs in the dict can make pygrametl insert the row before the actual values are ready, leading to instances of the class FutureResult being incorrectly passed to the database instead.

from pygrametl.datasources import CSVSource
from pygrametl.tables import FactTable, CachedDimension,\
     DecoupledDimension, DecoupledFactTable
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
from pygrametl.parallel import shareconnectionwrapper

# The data is read from a csv file
inputdata = CSVSource(f=open('sales.csv', 'r'), delimiter=',')

# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
    "jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")

# The connection wrapper is itself wrapped in a SharedConnectionClient,
# so it can be shared by multiple decoupled tables in a safe manner
conn = JDBCConnectionWrapper(jdbcconn=jconn)
shrdconn = shareconnectionwrapper(targetconnection=conn)

# The product dimension is decoupled and runs in a separate thread allowing
# it to be accessed by other decoupled tables without using the main thread
productDimension = DecoupledDimension(
    CachedDimension(
        name='product',
        key='productid',
        attributes=['name', 'price'],
        lookupatts=['name'],
        # The SharedConnectionWrapperClient must be copied for each
        # decoupled table that use it correct interaction with the database
        targetconnection=shrdconn.copy(),
        prefill=True)
    )

# The fact table is also decoupled in order to consume the values returned
# from the methods called on the product dimension without blocking the main
# thread while waiting for the database. Thus allowing the main thread to
# perform other operations needed before a full fact is ready
factTable = DecoupledFactTable(
    FactTable(
        name='facttable',
        measures=['sale'],
        keyrefs=['storeid', 'productid', 'dateid'],
        targetconnection=shrdconn.copy()),
    returnvalues=False,
    consumes=[productDimension]
    )

# Inserting facts into the database can be done in the same manner as in a
# sequential ETL flow, extraction of data from the product dimension is
# done automatically by pygrametl
for row in inputdata:
    # A new row is created for each fact, as having values not present in a
    # decoupled table that consumes another dimension, can make pygrametl
    # miscalculate when the actual results are ready, making the framework
    # pass a FutureResult to the database which usually raises an error
    fact = {}
    fact['storeid'] = row['storeid']
    fact['productid'] = productDimension.ensure(row)
    fact['dateid'] = row['dateid']
    fact['sale'] = row['sale']
    # Other CPU intensive transformations should be performed to take
    # advantage of the decoupled dimensions automatically exchanging data
    factTable.insert(fact)
shrdconn.commit()
shrdconn.close()

The above example shows a very simple use of decoupled tables in pygrametl, for real-world application, tuning of queues and buffers should be done to match the underlying hardware to maximize the performance of the parallel ETL flow. Although the example uses an instance of Dimension and FactTable for simplicity, it is supported for all types of dimensions and fact tables, except SubprocessFactTable on CPython as it already runs in its own process. Decoupling of tables requiring a large amount of processing when their methods are called, like a SnowflakedDimension, can help increase performance due to not blocking the main process while waiting on the database performing the joins.

If any user-defined function needs to access the database and be synchronized with the decoupled tables, it must be passed to shareconnectionwrapper(). An example of such a function is the bulk loader used for pygrametl’s BulkFactTable.

from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper
from pygrametl.parallel import shareconnectionwrapper

# JDBC and Jython is used as threads usually provides better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
    "jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")


# A user-defined function that can bulk load data into PostgreSQL over JDBC
def bulkloader(name, attributes, fieldsep, rowsep, nullval, filehandle):
    global jconn
    copymgr = jconn.getCopyAPI()
    sql = "COPY %s(%s) FROM STDIN WITH DELIMITER '%s'" % \
          (name, ', '.join(attributes), fieldsep)
    copymgr.copyIn(sql, filehandle)


# The connection wrapper is itself wrapped in a SharedConnectionClient so it
# can be shared by multiple decoupled tables in a safe manner. The function
# bulkloader is given to shareconnectionwrapper so the shared connection
# wrapper can ensure that the bulk loading function is synchronized with
# the decoupled tables using the shared connection wrapper
conn = JDBCConnectionWrapper(jdbcconn=jconn)
scw = shareconnectionwrapper(targetconnection=conn, userfuncs=[bulkloader])

Partitioning Tables

If a particular dimension or fact table requires more processing than the other tables, it can be beneficial to partition it into multiple partitions. Thus allowing operations to be conducted on one table in parallel to reduce the time needed to process that particular table. pygrametl supports partitioning of tables through multiple features. Firstly, the classes DimensionPartitioner and FactTablePartitioner automates the partitioning of rows for multiple decoupled dimensions or fact tables. How to do the partitioning is determined by a partitioning function with the signature func(dict)(). If no function is passed, then a default partitioning function is used as documented in the API. Secondly, to ensure that unique surrogate keys are assigned to all rows in a partitioned table, a shared sequence factory can be created using the function getsharedsequencefactory(). Each parallel process is then given a unique set of numbers to use as surrogate keys, ensuring that all surrogate keys are unique despite being assigned by separate processes.

from pygrametl.datasources import CSVSource
from pygrametl.tables import FactTable, CachedDimension, \
    DecoupledDimension, DecoupledFactTable, DimensionPartitioner
from pygrametl.parallel import shareconnectionwrapper, \
    getsharedsequencefactory
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper

sales = CSVSource(f=open('sales.csv', 'r'), delimiter=',')

# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
    "jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")

# The connection wrapper is itself wrapped in a SharedConnectionClient,
# so it can be shared by multiple decoupled tables in a safe manner
conn = JDBCConnectionWrapper(jdbcconn=jconn)
shrdconn = shareconnectionwrapper(targetconnection=conn)

# A sharedsequencefactory is created which provides values starting at zero.
# It gives each table a sequence of numbers to use as surrogate keys. The
# size of the sequence can be increased through a second argument if the
# sharedsequencefactory becomes a bottleneck in the ETL flow
idfactory = getsharedsequencefactory(0)

# The product dimension must use the sharedsequencefactory to ensure that
# the two processes do not assign overlapping surrogate key to the rows
productDimensionOne = DecoupledDimension(
    CachedDimension(
        name='product',
        key='productid',
        attributes=['name', 'price'],
        lookupatts=['name'],
        idfinder=idfactory(),
        targetconnection=shrdconn.copy(),
        prefill=True)
    )

productDimensionTwo = DecoupledDimension(
    CachedDimension(
        name='product',
        key='productid',
        attributes=['name', 'price'],
        lookupatts=['name'],
        idfinder=idfactory(),
        targetconnection=shrdconn.copy(),
        prefill=True)
    )

# The partitioning of data is automated by the DimensionPartitioner using
# a hash on the name of product. A FactTablePartitioner is also provided
productDimension = DimensionPartitioner(
    parts=[productDimensionOne, productDimensionTwo],
    partitioner=lambda row: hash(row['name']))

# Only partitioned tables needs to use the sharedsequencefactory, normal tables
# can without any problems use the default self-incrementing surrogate key
factTable = DecoupledFactTable(
        FactTable(
            name='facttable',
            measures=['sale'],
            keyrefs=['storeid', 'productid', 'dateid'],
            targetconnection=shrdconn.copy()),
        returnvalues=False,
        # When consuming a partitioned dimension each part should be
        # consumed separately, a simple way to do so is using the parts
        # method which returns all parts managed by the partitioner
        consumes=productDimension.parts
        )

# A partitioned table can be used in the same way as any other pygrametl
# table since the framework takes care of the partitioning behind the scenes
for row in sales:
    # A new row is created for each fact, as having values not present in a
    # decoupled table that consumes another dimension, can make pygrametl
    # miscalculate when the actual results are ready, making the framework
    # pass a FutureResult to the database which usually raises an error
    fact = {}
    fact['storeid'] = row['storeid']
    fact['dateid'] = row['dateid']
    fact['productid'] = productDimension.ensure(row)
    fact['sale'] = row['sale']
    # Other CPU intensive transformations should be performed to take
    # advantage of the decoupled dimensions automatically exchanging data
    factTable.insert(fact)
shrdconn.commit()
shrdconn.close()

The above example shows how to partition the data of the product dimension to multiple decoupled tables. This allows operations on the dimension to be performed by two different processes. The rows are partitioned using hash partitioning on the attribute name. A shared sequence factory is used to provide surrogate keys for the product dimension, as using a self-incrementing key would assign the same value to multiple rows. This is not needed for the fact table as only one table handles all operations on the fact table in the database, so a simple self-incrementing key is fine.

Splitpoints

As CPU intensive operations are often performed in user-defined functions, the decorator splitpoint() is provided. This decorator functions in much the same way as decoupled classes do for tables, as a number of processes are spawned to run the function. The number of processes to spawn can be passed to the decorator, allowing more processes to be created for functions with a longer run time. The first time a function with a decorator is called, a process is created to handle the call. This is done until the number of created processes matches the argument given to the decorator. Then, if a process is not available, the call and its arguments are added to a queue shared by the process created for the splitpoint. If a split function calls another function that requires synchronization it can be annotated with a new splitpoint with one as the argument, specifying that only one process is allowed to call this function at a time. To ensure all annotated functions are finished, the function endsplits() must be called, which joins all processes created by split points up to that point.

from pygrametl.tables import FactTable
from pygrametl.datasources import CSVSource
from pygrametl.parallel import splitpoint, endsplits
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper

sales = CSVSource(f=open('sales.csv', 'r'), delimiter=',')

# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
    "jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")

conn = JDBCConnectionWrapper(jdbcconn=jconn)

factTable = FactTable(
    name='facttable',
    measures=['sale'],
    keyrefs=['storeid', 'productid', 'dateid']
    )


# Five threads are created to run this function, so five rows can be
# transformed at the same time. If no threads are available, the row
# is added to a queue and transformed when a thread becomes idle
@splitpoint(instances=5)
def performExpensiveTransformations(row):
    # Do some (expensive) transformations...

    # As multiple threads perform the operation inside this function. a second
    # function must be created to synchronize inserting rows into the database
    insertRowIntoData(row)


# The function is annotated with an argument-free splitpoint, so its argument
# becomes one, thereby specifying that this function should run in one thread
@splitpoint
def insertRowIntoData(row):
    factTable.insert(row)


# The CSV file is read by the main thread, then each row is transformed by
# one of five threads, before being added to the database by a sixth thread
for row in sales:
    performExpensiveTransformations(row)

# To ensure that all splitpoint annotated functions are finished before
# the ETL flow is terminated, the function endsplits must be called as it
# joins all the threads created by splitpoints up to this point
endsplits()
conn.commit()
conn.close()

The above example shows how to use splitpoints. Here, a very computationally expensive function is annotated with a splitpoint which is given the argument five, allowing five processes to run the function at the same time. The second splitpoint without an argument ensures that only one process is allowed to execute that function at a time, so even though it is called from performExpensiveTransformation() only one process can insert rows into the fact table at the same time. Should the operations on the fact table become a bottleneck, it could be partitioned using FactTablePartitioner. To ensure that all splitpoints have finished execution, the function endsplits() is executed, which joins all splitpoints, before the database connection is closed.

As splitpoint annotated functions run in separate processes, any values they return are not available to the process calling them. To work around this restriction a queue can be passed as an argument to splitpoint in which the split function’s returned values will be added.

from pygrametl.datasources import CSVSource
from pygrametl.parallel import splitpoint, endsplits
from pygrametl.jythonmultiprocessing import Queue

queue = Queue()
sales = CSVSource(f=open('sales.csv', 'r'), delimiter=',')


# A queue is passed to the decorator, which uses it to store return values
@splitpoint(instances=5, output=queue)
def expensiveReturningOperation(row):

    # Some special value, in this case None, is used to indicate that no
    # more data will be given to the queue and that processing can continue
    if row is None:
        return None

    # Returned values are automatically added to the queue for other to use
    return row


# Each row in the sales.csv is extracted and passed to the function
for row in sales:
    expensiveReturningOperation(row)

# A simple sentinel value can be used to indicate that all rows have been
# processed and that the loop using the results below can break
expensiveReturningOperation(None)

# A infinite loop is used to process the returned values as the number of
# returned rows are unknown, so a sentinel value and a break is used instead
while True:
    # Extracts the processed row returned by the annotated function, a
    # simple sentinel value is used to indicate when the processing is done
    elem = queue.get()
    if elem is None:
        break

    # Use the returned elements after the sentinel check to prevent errors
    # ......

# To ensure that all splitpoint annotated functions are finished before
# the ETL flow is terminated, the function endsplits must be called as it
# joins all the process created by splitpoints up to this point
endsplits()

Flows

Another way to parallelize transformations is to use flows. In pygrametl, a flow is a sequence of functions with the same interface, each running in its own separate process, and where each function calls the next function in the sequence. A flow can be created from multiple different functions using the createflow() function. After a flow is created it can be called just like any other function. Internally, the arguments are passed from the first function to the last. While the arguments are passed to the functions, any returned values are ignored. Unlike splitpoint(), arguments are passed in batches and not as single values to reduce the overhead of synchronization.

from pygrametl.tables import Dimension
from pygrametl.datasources import CSVSource
from pygrametl.parallel import splitpoint, endsplits, createflow
from pygrametl.JDBCConnectionWrapper import JDBCConnectionWrapper

# JDBC and Jython are used as threads usually provide better performance
import java.sql.DriverManager
jconn = java.sql.DriverManager.getConnection(
    "jdbc:postgresql://localhost/dw?user=dwuser&password=dwpass")

conn = JDBCConnectionWrapper(jdbcconn=jconn)

products = CSVSource(f=open('product.csv', 'r'), delimiter=',')

productDimension = Dimension(
        name='product',
        key='productid',
        attributes=['name', 'price'],
        lookupatts=['name'])


# Two functions are defined to transform the information in product.csv
def normaliseProductNames(row):
    # Expensive operations should be performed in a flow, this example is
    # simple, so the performance gain is negated by the synchronization
    row['name'].lower()


def convertPriceToThousands(row):
    # Expensive operations should be performed in a flow, this example is
    # simple, so the performance gain is negated by the synchronization
    row['price'] = int(row['price']) / 1000


# A flow is created from the two functions defined above, this flow can then
# be called just like any other functions despite being parallelized
flow = createflow(normaliseProductNames, convertPriceToThousands)


# The data is read from product.csv in a splitpoint so the main process
# does not have to both read the input data and load it into the table
@splitpoint
def producer():
    for row in products:
        flow(row)

    # The flow should be closed when there is no more data available,
    # this means no more data is accepted but the computations will finish
    flow.close()


# The producer is called and the separate process starts to read the input
producer()

# The simplest way to extract rows from a flow is just to iterate over it,
# however additional functions to get the results as a list are available
for row in flow:
    productDimension.insert(row)
endsplits()
conn.commit()

A flow is used in the above example to combine multiple functions, each transforming the rows from product.csv. By creating a flow with these functions, a process is created for each to increase the ETL flows throughput. The overhead of transferring data between the functions is reduced through batching. Rows are provided to the flow in function producer(), which runs in a separate process using a splitpoint so the main process can load the transformed rows into the database by iterating over the flow.