datasources¶
This module holds classes that can be used as data soures. Note that it is easy to create other data sources: A data source must be iterable and provide dicts that map from attribute names to attribute values.
- pygrametl.datasources.BackgroundSource¶
alias of
ProcessSource
- pygrametl.datasources.CSVSource¶
alias of
DictReader
- class pygrametl.datasources.CrossTabbingSource(source, rowvaluesatt, colvaluesatt, values, aggregator=None, nonevalue=0, sortrows=False)¶
Bases:
object
A source that produces a crosstab from another source
Arguments:
source: the data source to pull data from
rowvaluesatt: the name of the attribute that holds the values that appear as rows in the result
colvaluesatt: the name of the attribute that holds the values that appear as columns in the result
values: the name of the attribute that holds the values to aggregate
aggregator: the aggregator to use (see pygrametl.aggregators). If not given, pygrametl.aggregators.Sum is used to sum the values
nonevalue: the value to return when there is no data to aggregate. Default: 0
sortrows: A boolean deciding if the rows should be sorted. Default: False
- class pygrametl.datasources.DynamicForEachSource(seq, callee)¶
Bases:
object
A source that for each given argument creates a new source that will be iterated by this source.
For example, useful for directories where a CSVSource should be created for each file.
The user must provide a function that when called with a single argument, returns a new source to iterate. A DynamicForEachSource instance can be given to several ProcessSource instances.
Arguments:
seq: a sequence with the elements for each of which a unique source must be created. the elements are given (one by one) to callee.
callee: a function f(e) that must accept elements as those in the seq argument. the function should return a source which then will be iterated by this source. the function is called once for every element in seq.
- class pygrametl.datasources.FilteringSource(source, filter=<class 'bool'>)¶
Bases:
object
A source that applies a filter to another source
Arguments:
source: the source to filter
filter: a callable f(row). If the result is a True value, the row is passed on. If not, the row is discarded. Default: bool, i.e., Python’s standard boolean conversion which removes empty rows.
- class pygrametl.datasources.HashJoiningSource(src1, key1, src2, key2)¶
Bases:
object
A class for equi-joining two data sources.
Arguments:
src1: the first source. This source is iterated row by row.
key1: the attribute of the first source to use in the join
src2: the second source. The rows of this source are all loaded into memory.
key2: the attriubte of the second source to use in the join.
- pygrametl.datasources.JoiningSource¶
alias of
HashJoiningSource
- class pygrametl.datasources.MappingSource(source, callables)¶
Bases:
object
A class for iterating a source and applying a function to each column.
Arguments:
source: A data source
callables: A dict mapping from attribute names to functions to apply to these names, e.g. type casting {‘id’:int, ‘salary’:float}
- class pygrametl.datasources.MergeJoiningSource(src1, key1, src2, key2)¶
Bases:
object
A class for merge-joining two sorted data sources
Arguments:
src1: a data source
key1: the attribute to use from src1
src2: a data source
key2: the attribute to use from src2
- class pygrametl.datasources.PandasSource(dataFrame)¶
Bases:
object
A source for iterating a Pandas DataFrame and cast each row to a dict.
Arguments:
dataFrame: A Pandas DataFrame
- class pygrametl.datasources.ProcessSource(source, batchsize=500, queuesize=20)¶
Bases:
object
A class for iterating another source in a separate process
Arguments:
source: the source to iterate
batchsize: the number of rows passed from the worker process each time it passes on a batch of rows. Must be positive. Default: 500
queuesize: the maximum number of batches that can wait in a queue between the processes. 0 means unlimited. Default: 20
- class pygrametl.datasources.RoundRobinSource(sources, batchsize=500)¶
Bases:
object
A source that reads sets of rows from sources in round robin-fashion
Arguments:
sources: a sequence of data sources
batchsize: the amount of rows to read from a data source before going to the next data source. Must be positive (to empty a source before going to the next, use UnionSource)
- class pygrametl.datasources.SQLSource(connection, query, names=(), initsql=None, cursorarg=None, parameters=None)¶
Bases:
object
A class for iterating the result set of a single SQL query.
Arguments:
connection: the PEP 249 connection to use. NOT a ConnectionWrapper!
query: the query that generates the result
names: names of attributes in the result. If not set, the names from the database are used. Default: ()
initsql: SQL that is executed before the query. The result of this initsql is not returned. Default: None.
cursorarg: if not None, this argument is used as an argument when the connection’s cursor method is called. Default: None.
parameters: if not None, this sequence or mapping of parameters will be sent when the query is executed.
- class pygrametl.datasources.SQLTransformingSource(source, temptablename, query, additionalcasts=None, batchsize=10000, perbatch=False, columnnames=None, usetruncate=True, targetconnection=None)¶
Bases:
object
A source that transforms rows from another source by loading them into a temporary table in an RDBMS and then retrieving them using an SQL query.
Warning
Creates, empties, and drops the temporary table.
Arguments:
source: a data source that yields rows with the same schema, i.e., they contain the same columns and the columns’ types do not change
temptablename: a string with the name of the temporary table to use. This table must use the same schema as the rows from source
query: the query that is executed on temptablename in targetconnection or an in-memory SQLite database to transforms the rows from source
additionalcasts: a dict with additional casts from Python types to SQL types in the form of strings that takes precedences over the default. Default: None, i.e., only int, float, and str is mapped to simple SQL types that should be supported by most RDBMSs
batchsize: an int deciding how many insert operations should be done in one batch. Default: 10000
perbatch: a boolean deciding if the transformation should be applied for each batch or for all rows in source. Default: False, i.e., the transformation is applied once for all rows in source
columnnames: a sequence of column names to use for transformed rows. Default: None, i.e., the column names from query are used
usetruncate: a boolean deciding if TRUNCATE should be used instead of DELETE FROM when emptying temptablename in targetconnection. Default: True, i.e., TRUNCATE is used instead of DELETE FROM
targetconnection: the PEP 249 connection to use, the ConnectionWrapper to use, or None. If None, a new temporary in-memory SQLite database is created
- class pygrametl.datasources.TransformingSource(source, *transformations)¶
Bases:
object
A source that applies functions to the rows from another source
Arguments:
source: a data source
*transformations: the transformations to apply. Must be callables of the form func(row) where row is a dict. Will be applied in the given order.
- class pygrametl.datasources.TypedCSVSource(f, casts, fieldnames=None, restkey=None, restval=None, dialect='excel', *args, **kwds)¶
Bases:
DictReader
A class for iterating a CSV file and type cast the values.
Arguments:
f: An iterable object such as as file. Passed on to csv.DictReader
casts: A dict mapping from attribute names to functions to apply to these names, e.g., {‘id’:int, ‘salary’:float}
fieldnames: Passed on to csv.DictReader
restkey: Passed on to csv.DictReader
restval: Passed on to csv.DictReader
dialect: Passed on to csv.DictReader
*args: Passed on to csv.DictReader
**kwds: Passed on to csv.DictReader
- next()¶
- class pygrametl.datasources.UnionSource(*sources)¶
Bases:
object
A source to union other sources (possibly with different types of rows). All rows are read from the 1st source before rows are read from the 2nd source and so on (to interleave the rows, use a RoundRobinSource)
Arguments:
*sources: The sources to union in the order they should be used.