datasources

This module holds classes that can be used as data soures. Note that it is easy to create other data sources: A data source must be iterable and provide dicts that map from attribute names to attribute values.

pygrametl.datasources.BackgroundSource

alias of ProcessSource

pygrametl.datasources.CSVSource

alias of DictReader

class pygrametl.datasources.CrossTabbingSource(source, rowvaluesatt, colvaluesatt, values, aggregator=None, nonevalue=0, sortrows=False)

Bases: object

A source that produces a crosstab from another source

Arguments:

  • source: the data source to pull data from

  • rowvaluesatt: the name of the attribute that holds the values that appear as rows in the result

  • colvaluesatt: the name of the attribute that holds the values that appear as columns in the result

  • values: the name of the attribute that holds the values to aggregate

  • aggregator: the aggregator to use (see pygrametl.aggregators). If not given, pygrametl.aggregators.Sum is used to sum the values

  • nonevalue: the value to return when there is no data to aggregate. Default: 0

  • sortrows: A boolean deciding if the rows should be sorted. Default: False

class pygrametl.datasources.DynamicForEachSource(seq, callee)

Bases: object

A source that for each given argument creates a new source that will be iterated by this source.

For example, useful for directories where a CSVSource should be created for each file.

The user must provide a function that when called with a single argument, returns a new source to iterate. A DynamicForEachSource instance can be given to several ProcessSource instances.

Arguments:

  • seq: a sequence with the elements for each of which a unique source must be created. the elements are given (one by one) to callee.

  • callee: a function f(e) that must accept elements as those in the seq argument. the function should return a source which then will be iterated by this source. the function is called once for every element in seq.

class pygrametl.datasources.FilteringSource(source, filter=<class 'bool'>)

Bases: object

A source that applies a filter to another source

Arguments:

  • source: the source to filter

  • filter: a callable f(row). If the result is a True value, the row is passed on. If not, the row is discarded. Default: bool, i.e., Python’s standard boolean conversion which removes empty rows.

class pygrametl.datasources.HashJoiningSource(src1, key1, src2, key2)

Bases: object

A class for equi-joining two data sources.

Arguments:

  • src1: the first source. This source is iterated row by row.

  • key1: the attribute of the first source to use in the join

  • src2: the second source. The rows of this source are all loaded into memory.

  • key2: the attriubte of the second source to use in the join.

pygrametl.datasources.JoiningSource

alias of HashJoiningSource

class pygrametl.datasources.MappingSource(source, callables)

Bases: object

A class for iterating a source and applying a function to each column.

Arguments:

  • source: A data source

  • callables: A dict mapping from attribute names to functions to apply to these names, e.g. type casting {‘id’:int, ‘salary’:float}

class pygrametl.datasources.MergeJoiningSource(src1, key1, src2, key2)

Bases: object

A class for merge-joining two sorted data sources

Arguments:

  • src1: a data source

  • key1: the attribute to use from src1

  • src2: a data source

  • key2: the attribute to use from src2

class pygrametl.datasources.PandasSource(dataFrame)

Bases: object

A source for iterating a Pandas DataFrame and cast each row to a dict.

Arguments:

  • dataFrame: A Pandas DataFrame

class pygrametl.datasources.ProcessSource(source, batchsize=500, queuesize=20)

Bases: object

A class for iterating another source in a separate process

Arguments:

  • source: the source to iterate

  • batchsize: the number of rows passed from the worker process each time it passes on a batch of rows. Must be positive. Default: 500

  • queuesize: the maximum number of batches that can wait in a queue between the processes. 0 means unlimited. Default: 20

class pygrametl.datasources.RoundRobinSource(sources, batchsize=500)

Bases: object

A source that reads sets of rows from sources in round robin-fashion

Arguments:

  • sources: a sequence of data sources

  • batchsize: the amount of rows to read from a data source before going to the next data source. Must be positive (to empty a source before going to the next, use UnionSource)

class pygrametl.datasources.SQLSource(connection, query, names=(), initsql=None, cursorarg=None, parameters=None)

Bases: object

A class for iterating the result set of a single SQL query.

Arguments:

  • connection: the PEP 249 connection to use. NOT a ConnectionWrapper!

  • query: the query that generates the result

  • names: names of attributes in the result. If not set, the names from the database are used. Default: ()

  • initsql: SQL that is executed before the query. The result of this initsql is not returned. Default: None.

  • cursorarg: if not None, this argument is used as an argument when the connection’s cursor method is called. Default: None.

  • parameters: if not None, this sequence or mapping of parameters will be sent when the query is executed.

class pygrametl.datasources.SQLTransformingSource(source, temptablename, query, additionalcasts=None, batchsize=10000, perbatch=False, columnnames=None, usetruncate=True, targetconnection=None)

Bases: object

A source that transforms rows from another source by loading them into a temporary table in an RDBMS and then retrieving them using an SQL query.

Warning

Creates, empties, and drops the temporary table.

Arguments:

  • source: a data source that yields rows with the same schema, i.e., they contain the same columns and the columns’ types do not change

  • temptablename: a string with the name of the temporary table to use. This table must use the same schema as the rows from source

  • query: the query that is executed on temptablename in targetconnection or an in-memory SQLite database to transforms the rows from source

  • additionalcasts: a dict with additional casts from Python types to SQL types in the form of strings that takes precedences over the default. Default: None, i.e., only int, float, and str is mapped to simple SQL types that should be supported by most RDBMSs

  • batchsize: an int deciding how many insert operations should be done in one batch. Default: 10000

  • perbatch: a boolean deciding if the transformation should be applied for each batch or for all rows in source. Default: False, i.e., the transformation is applied once for all rows in source

  • columnnames: a sequence of column names to use for transformed rows. Default: None, i.e., the column names from query are used

  • usetruncate: a boolean deciding if TRUNCATE should be used instead of DELETE FROM when emptying temptablename in targetconnection. Default: True, i.e., TRUNCATE is used instead of DELETE FROM

  • targetconnection: the PEP 249 connection to use, the ConnectionWrapper to use, or None. If None, a new temporary in-memory SQLite database is created

class pygrametl.datasources.TransformingSource(source, *transformations)

Bases: object

A source that applies functions to the rows from another source

Arguments:

  • source: a data source

  • *transformations: the transformations to apply. Must be callables of the form func(row) where row is a dict. Will be applied in the given order.

class pygrametl.datasources.TypedCSVSource(f, casts, fieldnames=None, restkey=None, restval=None, dialect='excel', *args, **kwds)

Bases: DictReader

A class for iterating a CSV file and type cast the values.

Arguments:

  • f: An iterable object such as as file. Passed on to csv.DictReader

  • casts: A dict mapping from attribute names to functions to apply to these names, e.g., {‘id’:int, ‘salary’:float}

  • fieldnames: Passed on to csv.DictReader

  • restkey: Passed on to csv.DictReader

  • restval: Passed on to csv.DictReader

  • dialect: Passed on to csv.DictReader

  • *args: Passed on to csv.DictReader

  • **kwds: Passed on to csv.DictReader

next()
class pygrametl.datasources.UnionSource(*sources)

Bases: object

A source to union other sources (possibly with different types of rows). All rows are read from the 1st source before rows are read from the 2nd source and so on (to interleave the rows, use a RoundRobinSource)

Arguments:

  • *sources: The sources to union in the order they should be used.