Python API Reference

reladiff.connect(db_conf: str | dict, thread_count: int | None = 1, shared: bool = True) Database

Connect to a database using the given database configuration.

Configuration can be given either as a URI string, or as a dict of {option: value}.

The dictionary configuration uses the same keys as the TOML ‘database’ definition given with –conf.

thread_count determines the max number of worker threads per database, if relevant. None means no limit.

Parameters:
  • db_conf (str | dict) – The configuration for the database to connect. URI or dict.

  • thread_count (int, optional) – Size of the threadpool. Ignored by cloud databases. (default: 1)

  • shared (bool) – Whether to cache and return the same connection for the same db_conf. (default: True)

Note: For non-cloud databases, a low thread-pool size may be a performance bottleneck.

Supported drivers: - postgresql - mysql - oracle - snowflake - bigquery - redshift - presto - databricks - trino - clickhouse - vertica

Example

>>> connect("mysql://localhost/db")
<sqeleton.databases.mysql.MySQL object at ...>
>>> connect({"driver": "mysql", "host": "localhost", "database": "db"})
<sqeleton.databases.mysql.MySQL object at ...>
reladiff.connect_to_table(db_info: str | dict | AbstractDatabase, table_name: Tuple[str, ...] | str, key_columns: str | Sequence[str] = ('id',), thread_count: int | None = 1, **kwargs) TableSegment

Connects to the given database, and creates a TableSegment instance

Parameters:
  • db_info – Either a URI string, dict of connection options or a reladiff AbstractDatabase type.

  • table_name – Name of the table as a string, or a tuple that signifies the path.

  • key_columns – Names of the key columns

  • thread_count – Number of threads for this connection (only if using a threadpooled db implementation)

See also

connect()

reladiff.diff_tables(table1: TableSegment, table2: TableSegment, *, key_columns: Sequence[str] | None = None, update_column: str | None = None, extra_columns: Tuple[str, ...] | None = None, min_key: Vector | None = None, max_key: Vector | None = None, min_update: datetime | None = None, max_update: datetime | None = None, threaded: bool = True, max_threadpool_size: int | None = 1, algorithm: Algorithm = Algorithm.AUTO, where: str | None = None, bisection_factor: int = 32, bisection_threshold: int = 16384, validate_unique_key: bool = True, sample_exclusive_rows: bool = False, materialize_to_table: str | Tuple[str, ...] | None = None, materialize_all_rows: bool = False, table_write_limit: int = 1000, allow_empty_tables: bool = False, skip_sort_results: bool = False) DiffResultWrapper

Finds the diff between table1 and table2.

Parameters:
  • key_columns (Tuple[str, ...]) – Name of the key column, which uniquely identifies each row (usually id)

  • update_column (str, optional) – Name of updated column, which signals that rows changed. Usually updated_at or last_update. Used by min_update and max_update.

  • extra_columns (Tuple[str, ...], optional) – Extra columns to compare

  • min_key (Vector, optional) – Lowest key value, used to restrict the segment

  • max_key (Vector, optional) – Highest key value, used to restrict the segment

  • min_update (DbTime, optional) – Lowest update_column value, used to restrict the segment

  • max_update (DbTime, optional) – Highest update_column value, used to restrict the segment

  • threaded (bool) – Enable/disable threaded diffing. Needed to take advantage of database threads.

  • max_threadpool_size (int) – Maximum size of each threadpool. None means auto. Only relevant when threaded is True. There may be many pools, so number of actual threads can be a lot higher. (Note: For best performance, we recommend setting this to at least twice the thread_count argument provided to the driver through connect()/connect_to_table().

  • where (str, optional) – An additional ‘where’ expression to restrict the search space.

  • algorithm (Algorithm) – Which diffing algorithm to use (HASHDIFF or JOINDIFF. Default=`AUTO`)

  • bisection_factor (int) – Into how many segments to bisect per iteration. (Used when algorithm is HASHDIFF)

  • bisection_threshold (Number) – Minimal row count of segment to bisect, otherwise download and compare locally. (Used when algorithm is HASHDIFF).

  • validate_unique_key (bool) – Enable/disable validating that the key columns are unique (JOINDIFF). Enable/disable support for duplicate rows, offering a small performance gain (HASHDIFF). (default: True) Single query, and can’t be threaded, so it’s very slow on non-cloud dbs. Future versions will detect UNIQUE constraints in the schema.

  • sample_exclusive_rows (bool) – Enable/disable sampling of exclusive rows. Creates a temporary table. (used for JOINDIFF. default: False)

  • materialize_to_table (Union[str, DbPath], optional) – Path of new table to write diff results to. Disabled if not provided. Used for JOINDIFF.

  • materialize_all_rows (bool) – Materialize every row, not just those that are different. (used for JOINDIFF. default: False)

  • table_write_limit (int) – Maximum number of rows to write when materializing, per thread.

  • allow_empty_tables (bool) – If false, diffing on empty tables raises an EmptyTable(ValueError) exception.

  • skip_sort_results (bool) – Skip sorting the hashdiff output by key for better performance. (used for HASHDIFF. default: False)

Note

The following parameters are used to override the corresponding attributes of the given TableSegment instances: key_columns, update_column, extra_columns, min_key, max_key, where. If different values are needed per table, it’s possible to omit them here, and instead set them directly when creating each TableSegment.

Note

It is recommended to call .close() on the returned object when done, to release thread-pool. Alternatively, you may use it as a context manager.

Example

>>> table1 = connect_to_table('postgresql:///', 'Rating', 'id')
>>> list(diff_tables(table1, table1))
[]
>>> with diff_tables(table1, table1) as diff:
...     print(list(diff))
[]
class reladiff.HashDiffer(threaded: bool = True, max_threadpool_size: int | None = 1, stats: dict = <factory>, allow_empty_tables: bool = False, bisection_factor: int = 32, bisection_threshold: ~numbers.Number = 16384, skip_sort_results: bool = False, duplicate_rows_support: bool = True)

Finds the diff between two SQL tables

The algorithm uses hashing to quickly check if the tables are different, and then applies a bisection search recursively to find the differences efficiently.

Works best for comparing tables that are mostly the same, with minor discrepancies.

Parameters:
  • bisection_factor (int) – Into how many segments to bisect per iteration.

  • bisection_threshold (Number) – When should we stop bisecting and compare locally (in row count).

  • threaded (bool) – Enable/disable threaded diffing. Needed to take advantage of database threads.

  • max_threadpool_size (int) – Maximum size of each threadpool. None means auto. Only relevant when threaded is True. There may be many pools, so number of actual threads can be a lot higher.

  • skip_sort_results (bool) – Skip sorting the hashdiff output by key for better performance. Entries with the same key but different column values may not appear adjacent in the output.

  • duplicate_rows_support (bool) – If True, the algorithm will support duplicate rows in the tables.

__init__(threaded: bool = True, max_threadpool_size: int | None = 1, stats: dict = <factory>, allow_empty_tables: bool = False, bisection_factor: int = 32, bisection_threshold: ~numbers.Number = 16384, skip_sort_results: bool = False, duplicate_rows_support: bool = True) None
diff_tables(table1: TableSegment, table2: TableSegment, *, info_tree: InfoTree | None = None) DiffResultWrapper

Diff the given tables.

Parameters:
  • table1 (TableSegment) – The “before” table to compare. Or: source table

  • table2 (TableSegment) – The “after” table to compare. Or: target table

Returns:

An iterator that yield pair-tuples, representing the diff. Items can be either - (‘-’, row) for items in table1 but not in table2. (‘+’, row) for items in table2 but not in table1. Where row is a tuple of values, corresponding to the diffed columns.

class reladiff.JoinDiffer(threaded: bool = True, max_threadpool_size: int | None = 1, stats: dict = <factory>, allow_empty_tables: bool = False, validate_unique_key: bool = True, sample_exclusive_rows: bool = False, materialize_to_table: ~typing.Tuple[str, ...] | None = None, materialize_all_rows: bool = False, table_write_limit: int = 1000)

Finds the diff between two SQL tables in the same database, using JOINs.

The algorithm uses an OUTER JOIN (or equivalent) with extra checks and statistics. The two tables must reside in the same database, and their primary keys must be unique and not null.

All parameters are optional.

Parameters:
  • threaded (bool) – Enable/disable threaded diffing. Needed to take advantage of database threads.

  • max_threadpool_size (int) – Maximum size of each threadpool. None means auto. Only relevant when threaded is True. There may be many pools, so number of actual threads can be a lot higher.

  • validate_unique_key (bool) – Enable/disable validating that the key columns are unique. (default: True) If there are no UNIQUE constraints in the schema, it is done in a single query, and can’t be threaded, so it’s very slow on non-cloud dbs.

  • sample_exclusive_rows (bool) – Enable/disable sampling of exclusive rows. (default: False) Creates a temporary table.

  • materialize_to_table (DbPath, optional) – Path of new table to write diff results to. Disabled if not provided.

  • materialize_all_rows (bool) – Materialize every row, not just those that are different. (default: False)

  • table_write_limit (int) – Maximum number of rows to write when materializing, per thread.

__init__(threaded: bool = True, max_threadpool_size: int | None = 1, stats: dict = <factory>, allow_empty_tables: bool = False, validate_unique_key: bool = True, sample_exclusive_rows: bool = False, materialize_to_table: ~typing.Tuple[str, ...] | None = None, materialize_all_rows: bool = False, table_write_limit: int = 1000) None
diff_tables(table1: TableSegment, table2: TableSegment, *, info_tree: InfoTree | None = None) DiffResultWrapper

Diff the given tables.

Parameters:
  • table1 (TableSegment) – The “before” table to compare. Or: source table

  • table2 (TableSegment) – The “after” table to compare. Or: target table

Returns:

An iterator that yield pair-tuples, representing the diff. Items can be either - (‘-’, row) for items in table1 but not in table2. (‘+’, row) for items in table2 but not in table1. Where row is a tuple of values, corresponding to the diffed columns.

class reladiff.TableSegment(database: ~sqeleton.databases.base.Database = <object object>, table_path: ~typing.Tuple[str, ...] = <object object>, key_columns: ~typing.Tuple[str, ...] = <object object>, update_column: str | None = None, extra_columns: ~typing.Tuple[str, ...] = (), min_key: ~reladiff.utils.Vector | None = None, max_key: ~reladiff.utils.Vector | None = None, min_update: ~datetime.datetime | None = None, max_update: ~datetime.datetime | None = None, where: str | None = None, case_sensitive: bool = True, _schema: ~sqeleton.utils.CaseAwareMapping | None = None)

Signifies a segment of rows (and selected columns) within a table

Parameters:
  • database (Database) – Database instance. See connect()

  • table_path (DbPath) – Path to table in form of a tuple. e.g. (‘my_dataset’, ‘table_name’)

  • key_columns (Tuple[str]) – Name of the key column, which uniquely identifies each row (usually id)

  • update_column (str, optional) – Name of updated column, which signals that rows changed. Usually updated_at or last_update. Used by min_update and max_update.

  • extra_columns (Tuple[str, ...], optional) – Extra columns to compare

  • min_key (Vector, optional) – Lowest key value, used to restrict the segment

  • max_key (Vector, optional) – Highest key value, used to restrict the segment

  • min_update (DbTime, optional) – Lowest update_column value, used to restrict the segment

  • max_update (DbTime, optional) – Highest update_column value, used to restrict the segment

  • where (str, optional) – An additional ‘where’ expression to restrict the search space.

  • case_sensitive (bool) – If false, the case of column names will adjust according to the schema. Default is true.

with_schema(refine: bool = True, allow_empty_table: bool = False) TableSegment

Queries the table schema from the database, and returns a new instance of TableSegment, with a schema.

get_values() list

Download all the relevant values of the segment from the database

choose_checkpoints(count: int) List[List[int | str | bytes | ArithUUID | ArithAlphanumeric]]

Suggests a bunch of evenly-spaced checkpoints to split by, including start, end.

segment_by_checkpoints(checkpoints: List[List[int | str | bytes | ArithUUID | ArithAlphanumeric]]) List[TableSegment]

Split the current TableSegment to a bunch of smaller ones, separated by the given checkpoints

new(**kwargs) TableSegment

Creates a copy of the instance using ‘replace()’

count() int

Count how many rows are in the segment, in one pass.

count_and_checksum() Tuple[int, int]

Count and checksum the rows in the segment, in one pass.

__init__(database: ~sqeleton.databases.base.Database = <object object>, table_path: ~typing.Tuple[str, ...] = <object object>, key_columns: ~typing.Tuple[str, ...] = <object object>, update_column: str | None = None, extra_columns: ~typing.Tuple[str, ...] = (), min_key: ~reladiff.utils.Vector | None = None, max_key: ~reladiff.utils.Vector | None = None, min_update: ~datetime.datetime | None = None, max_update: ~datetime.datetime | None = None, where: str | None = None, case_sensitive: bool = True, _schema: ~sqeleton.utils.CaseAwareMapping | None = None) None
class reladiff.DiffResultWrapper(diff: iter = <object object>, info_tree: ~reladiff.info_tree.InfoTree = <object object>, stats: dict = <object object>, _ti: ~reladiff.thread_utils.ThreadedYielder = <object object>, result_list: list = <factory>)

Wrapper for the diff result, with additional stats and info

Supports reenterant iteration, context management, and immediate closing of the thread pool.

Note: Once the threadpool is closed, the iterator will not be able to continue.

__iter__()

Iterate over the results of the diff.

It’s a “lazy-list”: Repeated calls will return the same results, but will not re-run the diff.

close()

Immediately stop diffing and close the thread pool

get_stats_string()

Return a pretty string of the diff stats (used by the CLI)

get_stats_dict()

Return a dictionary of the diff stats

reladiff.DbTime = <class 'datetime.datetime'>

datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])

The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.

reladiff.DbPath

The central part of internal API.

This represents a generic version of type ‘origin’ with type arguments ‘params’. There are two kind of these aliases: user defined and special. The special ones are wrappers around builtin collections and ABCs in collections.abc. These must have ‘name’ always set. If ‘inst’ is False, then the alias can’t be instantiated, this is used by e.g. typing.List and typing.Dict.

alias of Tuple[str, …]

enum reladiff.Algorithm(value)

An enumeration.

Valid values are as follows:

AUTO = <Algorithm.AUTO: 'auto'>
JOINDIFF = <Algorithm.JOINDIFF: 'joindiff'>
HASHDIFF = <Algorithm.HASHDIFF: 'hashdiff'>