parallel

This module contains methods and classes for making parallel ETL flows. Note that this module in many cases will give better results with Jython (where it uses threads) than with CPython (where it uses processes).

class pygrametl.parallel.Decoupled(obj, returnvalues=True, consumes=(), directupdatepositions=(), batchsize=500, queuesize=200, autowrap=True)

Bases: object

shutdowndecoupled()

Let the Decoupled instance finish its tasks and stop it.

The Decoupled instance should not be used after this.

pygrametl.parallel.createflow(*functions, **options)

Create a flow of functions running in different processes.

A Flow object ready for use is returned.

A flow consists of several functions running in several processes. A flow created by

flow = createflow(f1, f2, f3)

uses three processes. Data can be inserted into the flow by calling it as in flow(data). The argument data is then first processed by f1(data), then f2(data), and finally f3(data). Return values from f1, f2, and f3 are not preserved, but their side-effects are. The functions in a flow should all accept the same number of arguments (*args are also okay).

Internally, a Flow object groups calls together in batches to reduce communication costs (see also the description of arguments below). In the example above, f1 could thus work on one batch, while f2 works on another batch and so on. Flows are thus good to use even if there are many calls of relatively fast functions.

When no more data is to be inserted into a flow, it should be closed by calling its close method.

Data processed by a flow can be fetched by calling get/getall or simply iterating the flow. This can both be done by the process that inserted data into the flow or by another (possibly concurrent) process. All data in a flow should be fetched again as it otherwise will remain in memory .

Arguments:

  • *functions: A sequence of functions of sequences of functions. Each element in the sequence will be executed in a separate process. For example, the argument (f1, (f2, f3), f4) leads to that f1 executes in process-1, f2 and f3 execute in process-2, and f4 executes in process-3. The functions in the sequence should all accept the same number of arguments.

  • **options: keyword arguments configuring details. The considered options are:

    • batchsize: an integer deciding how many function calls are “grouped together” before they are passed on between processes. The default is 500.

    • queuesize: an integer deciding the maximum number of batches that can wait in a JoinableQueue between two different processes. 0 means that there is no limit. The default is 25.

pygrametl.parallel.endsplits()

Wait for all splitpoints to finish

pygrametl.parallel.getsharedsequencefactory(startvalue, intervallen=5000)

Creates a factory for parallel readers of a sequence.

Returns a callable f. When f() is called, it returns a callable g. Whenever g(*args) is called, it returns a unique int from a sequence (if several g’s are created, the order of the calls may lead to that the returned ints are not ordered, but they will be unique). The arguments to g are ignored, but accepted. Thus g can be used as idfinder for [Decoupled]Dimensions.

The different g’s can be used safely from different processes and threads.

Arguments:

  • startvalue: The first value to return. If None, 0 is assumed.

  • intervallen: The amount of numbers that a single g from above can return before synchronization is needed to get a new amount. Default: 5000.

pygrametl.parallel.shareconnectionwrapper(targetconnection, maxclients=10, userfuncs=())

Share a ConnectionWrapper between several processes/threads.

When Decoupled objects are used, they can try to update the DW at the same time. They can use several ConnectionWrappers to avoid race conditions, but this is not transactionally safe. Instead, they can use a “shared” ConnectionWrapper obtained through this function.

When a ConnectionWrapper is shared, it is executing in a separate process (or thread, in case Jython is used) and ensuring that only one operation takes place at the time. This is hidden from the users of the shared ConnectionWrapper. They see an interface similar to the normal ConnectionWrapper.

When this method is called, it returns a SharedConnectionWrapperClient which can be used as a normal ConnectionWrapper. Each process (i.e., each Decoupled object) should, however, get a unique SharedConnectionWrapperClient by calling copy() on the returned SharedConnectionWrapperClient.

Note that a shared ConnectionWrapper needs to hold the complete result of each query in memory until it is fetched by the process that executed the query. Again, this is hidden from the users.

It is also possible to add methods to a shared ConnectionWrapper when it is created. When this is done and the method is invoked, no other operation will modify the DW at the same time. If, for example, the functions foo and bar are added to a shared ConnectionWrapper (by passing the argument userfuncs=(foo, bar) to shareconnectionwrapper), the returned SharedConnectionWrapperClient will offer the methods foo and bar which when called will be running in the separate process for the shared ConnectionWrapper. This is particularly useful for user-defined bulk loaders as used by BulkFactTable:

def bulkload():

DBMS-specific code here. No other DW operation should take place concurrently

scw = shareconnectionwrapper(ConnectionWrapper(…), userfuncs=(bulkload,))

facttbl = BulkFact(…, bulkloader=scw.copy().bulkload)

Note

The SharedConnectionWrapper must be copied using .copy().

Arguments:

  • targetconnection: a pygrametl ConnectionWrapper

  • maxclients: the maximum number of concurrent clients. Default: 10

  • userfuncs: a sequence of functions to add to the shared ConnectionWrapper. Default: ()

pygrametl.parallel.splitpoint(*arg, **kwargs)

To be used as an annotation to make a function run in a separate process.

Each call of a @splitpoint annotated function f involves adding the request (and arguments, if any) to a shared queue. This can be relatively expensive if f only uses little computation time. The benefits from @splitpoint are thus best obtained for a function f which is time-consuming. To wait for all splitpoints to finish their computations, call endsplits().

@splitpoint can be used as in the following examples:

@splitpoint
def f(args):

The simplest case. Makes f run in a separate process. All calls of f will return None immediately and f will be invoked in the separate process.

@splitpoint()
def g(args):

With parentheses. Has the same effect as the previous example.

@splitpoint(output=queue, instances=2, queuesize=200)
def h(args):

With keyword arguments. It is not required that all of keyword arguments above are given.

Keyword arguments:

  • output: If given, it should be a queue-like object (offering the .put(obj) method). The annotated function’s results will then be put in the output

  • instances: Determines how many processes should run the function. Each of the processes will have the value parallel.splitno set to a unique value between 0 (incl.) and instances (excl.).

  • queuesize: Given as an argument to a multiprocessing.JoinableQueue which holds arguments to the annotated function while they wait for an idle process that will pass them on to the annotated function. The argument decides the maximum number of calls that can wait in the queue. 0 means unlimited. Default: 0