API Reference¶
- padawan.scan_parquet(path)¶
Read partitioned data from disk.
- Parameters:
path (str) – Path to a directory holding the partitioned data. Each file under path ending in
.parquetwill become a partition of the dataset. Metadata about the partitions such as schema, partition sizes and bounds may be stored in special files named_padawan_metadata.jsonand_padawan_schema. These files are created automatically when the data is written withpadawan.Dataset.write_parquet(). If they are not present the resultingpadawan.Datasetwill have unknown bounds and sizes.- Returns:
A dataset representing the data under path.
- Return type:
- padawan.from_polars(data, index_columns=())¶
Create a single-partition dataset from a polars DataFrame.
- Parameters:
data (polars.DataFrame or polars.LazyFrame) – The polars DataFrame object to use as the (single) partition. If this is a
polars.LazyFrameobject it will be collected.index_columns (tuple of str, optional) – The columns to use as index. Defaults to the empty tuple, i.e. to not designating any columns as index.
- Returns:
A single-partiton dataset containing data.
- Return type:
- padawan.concat(datasets)¶
Concatenate multiple datasets.
- Parameters:
datasets (list of padawan.Dataset) – The datasets to concatenate. All datasets must have the same index columns and the same schema.
- Returns:
The concatenated dataset.
- Return type:
- class padawan.Dataset(npartitions, index_columns=(), sizes=None, lower_bounds=None, upper_bounds=None, schema=None)¶
Abstract base class for datasets.
This class has the semantics of a list of
polars.LazyFrameobjects. For an instancedsof this class you can uselen(ds)to get the number of partitions,ds[i]to access the i-th partition andfor part in dsto iterate over partitions.Usually you will use
padawan.scan_parquet()to createDatasetinstances, so you will not need to instantiate this class or any of its derived classes directly.- collate(rows_per_partition)¶
Merge partitions to get a certain minimum number of rows per partition.
The partition sizes and bounds must be known to use this method. You can call
padawan.Dataset.reindex()first to compute them.This method does not split existing partitions. Use
padawan.Dataset.repartition()for better (but computationally more expensive) control over partition sizes.- Parameters:
rows_per_partition (int) – The desired minimum number of rows per partition.
- Returns:
- A dataset with the desired minimum number of
rows per partition.
- Return type:
- collect(parallel=False, progress=False)¶
Pull all data into memory.
- Parameters:
parallel (bool or int, optional) – Specifies how to parallelize the computation. See corresponding argument for
padawan.Dataset.write_parquet()for details. Defaults toFalse.progress (callable, str, int, bool or tuple, optional) – Whether and how to print progress messages. See corresponding argument for
padawan.Dataset.write_parquet()for details. Defaults toFalse.
- Returns:
- A single dataframe with all partitions
concatenated.
- Return type:
polars.DataFrame
- property index_columns¶
A tuple of strings with the index columns of the dataset.
padawantries to keep track of the upper and lower bounds of the index columns in each partition. Thepadawan.Dataset.slice()andpadawan.Dataset.join()methods implement joining and slicing on the current index columns.
- join(other, how='inner')¶
Join with another dataset.
- Parameters:
other (padawan.Dataset) – The dataset to join. self and other must have the same index columns and the join is done on those columns. You can use
padawan.Dataset.reindex()andpadawan.Dataset.rename()to give both datasets index columns with the same name.how (str, optional) – The type of join to perform. Supported values are
'inner','left'and'outer'. Defaults to'inner'.
- Returns:
The joined dataset.
- Return type:
- property known_bounds¶
Trueif the partition bounds are known.
- property known_schema¶
Trueif the table schema is known.
- property known_sizes¶
Trueif the partition sizes (number of rows) are known.
- property lower_bounds¶
A tuple holding the lower bounds for each partition.
The length of
lower_boundsis equal to the number of partitions. Each lower bound is a tuple with the same length aspadawan.Dataset.index_columns. Bounds are computed by constructing tuples from the index columns and using lexicographic ordering. So, a partition with index columns('a', 'b')and dataa
b
2
3
1
2
2
1
will have the lower bound
(1, 2)since the second row would become the first when the data is sorted by (a, b).
- map(func, extra_args=None, shared_args=None, index_columns=None, schema=None, preserves='none')¶
Apply a function to all partitions.
- Parameters:
func (callable) –
The function to apply. It should return a
polars.DataFrameorpolars.LazyFrameand support the following signature:func(part, extra_arg_1, extra_arg_2, ..., shared_arg_1, shared_arg_2, ...)
where part is a
polars.LazyFramewith the partition data, extra_arg_1 etc. are partition-specific arguments specified via extra_args (see below) and shared_arg_1 etc are shared arguments specified via shared_args (see below).extra_args (list of tuples, optional) – Extra partition-specific arguments passed to func. The length of the list must equal the number of partitions and each tuple in the list is unpacked and then passed as additional arguments to the function call for the corresponding partition. Defaults to
None, in which case no extra arguments are passed.shared_args (tuple, optional) – This tuple is unpacked and passed as extra arguments to every call of func. Defaults to
None, in which case no shared arguments are passed.index_columns (tuple of str, optional) – The new index columns to use after func is applied. (Note that func may change the schema of the dataset, so the old index columns might not exist anymore after func is applied.) Defaults to
None, in which case the old index columns are used.schema (dict, optional) – The schema of the dataset after the map. Defaults to
None, in which case the schema will be unknown.preserves (str, optional) –
Specifies which part of the metadata is preserved by func. Possible values are:
'none'No metadata is preserved.
'sizes'Partition sizes (number of rows) are preserved.
'bounds'Partition bounds (
self.lower_boundsandself.upper_bounds) are preserved.'all'Both partition sizes and bounds are preserved.
Defaults to
'none'.Note that the behaviour of func is not checked. If you specify
preserves='bounds'but your func actually changes the bounds this will lead to incorrect behaviour downstream.
- Returns:
A dataset with func applied to each partition.
- Return type:
- reindex(index_columns=None, collect_stats=True, parallel=False, progress=False)¶
Set index columns and compute partition sizes and bounds.
- Parameters:
index_columns (tuple of str, optional) –
The columns to use as index. Defaults to
None, in which case partition sizes and bounds are computed for the current index columns.If index_columns is a truncation of the current index columns (e.g.
self.index_columnsis('a', 'b', 'c')and index_columns is('a', 'b')) the new partition bounds can and will be computed purely from the metadata and without loading any partitions into memory.collect_stats (bool, optional) – Whether to compute the sizes and index bounds of the partitions if they are not known. Defaults to
True.parallel (bool or int, optional) – Specifies how to parallelize the computation. See corresponding argument for
padawan.Dataset.write_parquet()for details. Defaults toFalse.progress (callable, str, int, bool or tuple, optional) – Whether and how to print progress messages. See corresponding argument for
padawan.Dataset.write_parquet()for details. Defaults toFalse.
- Returns:
The reindexed dataset.
- Return type:
- rename(mapping)¶
Rename columns of the dataset.
- Parameters:
mapping (dict) – A dictionary mapping old column names to the new ones.
- Returns:
The dataset with renamed columns.
- Return type:
- repartition(rows_per_partition, sample_fraction=1.0, parallel=False, progress=False, base_seed=10, seed_increment=10)¶
Repartition the dataset.
The data is partitioned so that rows with the same values for the index columns appear in the same partition.
- Parameters:
rows_per_partition (int) – The desired number of rows per partition.
sample_fraction (float, optional) – The fraction of rows of the full dataset that will be sampled in order to determine the new partition boundaries. Defaults to 1, in which case all rows are processed. You need to reduce this in cases where the index columns for the full dataset cannot be stored in memory.
parallel (bool or int, optional) – Specifies how to parallelize the computation. See corresponding argument for
padawan.Dataset.write_parquet()for details. Defaults toFalse.progress (callable, str, int, bool or tuple, optional) – Whether and how to print progress messages. See corresponding argument for
padawan.Dataset.write_parquet()for details. Defaults toFalse.base_seed (int, optional) – The random seed used to sample rows from the first partition of self. Defaults to 10.
seed_increment (int, optional) – For every subsequent partition the random seed in incremented by seed_increment. Defaults to 10.
- Returns:
The repartitioned dataset.
- Return type:
- property schema¶
A dict mapping column names to
polarsdata types.Will be
Noneif the schema is not known.
- property sizes¶
A tuple of integers holding the number of rows for each partition.
Noneif the partition sizes are not known.
- slice(lb=None, ub=None, inclusive='lower')¶
Take a slice of the dataset using the current index columns.
- Parameters:
lb (tuple, optional) – A tuple with the same length as
self.index_columnsspecifying the lower bound of the slice. Defaults toNone, in which case there is no lower bound.ub (tuple, optional) – A tuple with the same length as
self.index_columnsspecifying the upper bound of the slice. Defaults toNone, in which case there is no upper bound.inclusive (str, optional) – Specifies which of the bounds are inclusive. Allowed values are
'none','lower','upper'or'both'. Defaults to'lower'.
- Returns:
The slice of the dataset.
- Return type:
- property upper_bounds¶
A tuple holding the lower bounds for each partition.
The length of
upper_boundsis equal to the number of partitions. Each upper bound is a tuple with the same length aspadawan.Dataset.index_columns. Bounds are computed by constructing tuples from the index columns and using lexicographic ordering. So, a partition with index columns('a', 'b')and dataa
b
2
1
1
3
1
2
will have the upper bound
(2, 1)since the first row would become the last when the data is sorted by (a, b).
- write_parquet(path, append=False, parallel=False, progress=False)¶
Write the dataset to disk.
- Parameters:
path (str) – The directory that will contain the parquet files. If a file or directory with that name exists (and append is
False) it will be deleted first.append (bool, optional) – If
Truethe partitions are appended to an existing dataset in path. In this case you must make sure that path is an existing directory containing valid padawan metadata files (e.g. by writing to the same path withwrite_parquet(path, append=False)first).parallel (bool or int) –
Specifies how to parallelize the computation:
parallel = Trueuse all available CPUs
parallel = Falseno parallelism
parallel > 1use
parallelnumber of CPUsparallel in [0, 1]no parallelism
parallel = -n < 0use number of available CPUs minus n
progress (callable, str, int, bool or tuple, optional) –
Whether to print and how to print progress messages about the computation. The possible values are:
FalseNo progress messages are printed.
TrueA default progress message is printed after each completed partition.
- format string
A custom message is printed after each completed partition. The following format variables can be used:
- completed (int)
The number of completed partitions.
- total (int)
The total number of partitions.
- remaining (int)
The remaining number of partitions.
- start (str)
The starting time of the computation in ISO format.
- finish (str)
The expected finishing time of the computation in ISO format.
- telapsed (str)
The elapsed time of the computations.
- tremaining (str)
The expected time to finish.
- ttotal (str)
The expected total time of the computation.
- integer n
Print the default message only after every n completed partitions.
- tuple of the form
(msg, n) Print a custom message after every n completed partitions.
- callable
Call a custom function after every completed partition. The function must accept the following arguments:
- completed (int)
The number of completed partitions.
- total (int)
The total number of partitions.
- start (datetime.datetime)
The starting time of the computation.
- finish (datetime.datetime)
The expected finishing time of the computation.
- Returns:
Noneifappend=False. Otherwise thedataset that was written is returned, as if it was read back in with
padawan.scan_parquet().
- Return type:
padawan.Dataset or None