MatchFlow

MatchFlow - A toolkit for entity matching.

This package provides tools for creating, training, and applying entity matchers using various tokenization, featurization, and machine learning techniques.

class MatchFlow.CLILabeler(a_df, b_df, id_col: str = '_id')[source]

Bases: Labeler

CLI for labeling pairs of records.

Parameters:
  • a_df (Union[pd.DataFrame, SparkDataFrame]) – the first dataframe

  • b_df (Union[pd.DataFrame, SparkDataFrame]) – the second dataframe

  • id_col (str, default '_id') – the column name of the id column

class MatchFlow.CustomLabeler(a_df, b_df, id_col: str = '_id')[source]

Bases: Labeler

Custom labeler for labeling pairs of records.

Parameters:
  • a_df (Union[pd.DataFrame, SparkDataFrame]) – the first dataframe

  • b_df (Union[pd.DataFrame, SparkDataFrame]) – the second dataframe

  • id_col (str, default '_id') – the column name of the id column

abstractmethod label_pair(row1, row2)[source]

label the pair (id1, id2)

Returns:

float

Return type:

the label for the pair

class MatchFlow.Feature(a_attr: str, b_attr: str)[source]

Bases: ABC

property a_attr

the name of the attribute from table a used to generate this feature

property b_attr

the name of the attribute from table a used to generate this feature

build(A, B, cache)[source]

Guarenteed to be called before the features preprocessing is done. this method should generate and store all of the metadata required to compute the features over A and B, NOTE B may be None

preprocess(data, is_table_a)[source]

preprocess the data, adding the output column to data

preprocess_output_column(for_table_a: bool)[source]

get the name of the preprocessing output column for table A or B

classmethod template(**kwargs)[source]
class MatchFlow.GoldLabeler(gold)[source]

Bases: Labeler

Gold labeler for labeling pairs of records.

Parameters:

gold (Union[pd.DataFrame, SparkDataFrame]) – the gold dataframe, should contain columns ‘id1’ and ‘id2’

class MatchFlow.Labeler[source]

Bases: ABC

Base class for labelers.

class MatchFlow.MLModel[source]

Bases: ABC

Abstract base class for machine learning models.

This class defines the interface that all machine learning models must implement, whether they are scikit-learn models or PySpark ML models. It provides methods for training, prediction, confidence estimation, and entropy calculation.

nan_fill

Value to use for filling NaN values in feature vectors

Type:

float or None

use_vectors

Whether the model expects feature vectors in vector format

Type:

bool

use_floats

Whether the model uses float32 (True) or float64 (False) precision

Type:

bool

abstractmethod entropy(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]

Calculate entropy of predictions.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors

  • vector_col (str) – Name of the column containing feature vectors

  • output_col (str) – Name of the column to store entropy values in

Returns:

The input DataFrame with entropy values added in the output_col

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

abstract property nan_fill: float | None

Value to use for filling NaN values in feature vectors.

Returns:

The value to use for filling NaN values, or None if no filling is needed

Return type:

float or None

abstractmethod params_dict() dict[source]

Get a dictionary of model parameters.

Returns:

Dictionary containing model parameters and configuration

Return type:

dict

abstractmethod predict(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]

Make predictions using the trained model.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on

  • vector_col (str) – Name of the column containing feature vectors

  • output_col (str) – Name of the column to store predictions in

Returns:

The input DataFrame with predictions added in the output_col

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

abstractmethod predict_with_confidence(df: DataFrame | DataFrame, vector_col: str, prediction_col: str, confidence_col: str) DataFrame | DataFrame[source]

Make predictions and confidence scores using the trained model.

This method is more efficient than calling predict() and prediction_conf() separately as it computes both in a single pass when possible.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on

  • vector_col (str) – Name of the column containing feature vectors

  • prediction_col (str) – Name of the column to store predictions in

  • confidence_col (str) – Name of the column to store confidence scores in

Returns:

The input DataFrame with predictions and confidence scores added

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

abstractmethod prediction_conf(df: DataFrame | DataFrame, vector_col: str, label_column: str) DataFrame | DataFrame[source]

Calculate prediction confidence scores.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors

  • vector_col (str) – Name of the column containing feature vectors

  • label_column (str) – Name of the column containing true labels

Returns:

The input DataFrame with confidence scores added

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

prep_fvs(fvs: DataFrame | DataFrame, feature_col: str = 'feature_vectors') DataFrame | DataFrame[source]

Prepare feature vectors for model input.

This method handles NaN filling and conversion between vector and array formats based on the model’s requirements.

Parameters:
  • fvs (pandas.DataFrame or pyspark.sql.DataFrame) – DataFrame containing feature vectors

  • feature_col (str, optional) – Name of the column containing feature vectors

Returns:

DataFrame with prepared feature vectors

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

abstractmethod train(df: DataFrame | DataFrame, vector_col: str, label_column: str)[source]

Train the model on the given data.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing training data

  • vector_col (str) – Name of the column containing feature vectors

  • label_column (str) – Name of the column containing labels

Returns:

The trained model (self)

Return type:

MLModel

abstract property trained_model

The trained ML Model object

Returns:

The trained ML Model object

Return type:

MLModel

abstract property use_floats: bool

Whether the model uses float32 or float64 precision.

Returns:

True if the model uses float32, False if it uses float64

Return type:

bool

abstract property use_vectors: bool

Whether the model expects feature vectors in vector format.

Returns:

True if the model expects vectors, False if it expects arrays

Return type:

bool

class MatchFlow.SKLearnModel(model, nan_fill=None, use_floats=True, **model_args)[source]

Bases: MLModel

Scikit-learn model wrapper.

This class wraps scikit-learn models to provide a consistent interface with PySpark ML models. It handles conversion between pandas and PySpark DataFrames, and manages model training and prediction.

Parameters:
  • model (sklearn.base.BaseEstimator or type) – The scikit-learn model class or instance to use

  • nan_fill (float or None, optional) – Value to use for filling NaN values

  • use_floats (bool, optional) – Whether to use float32 (True) or float64 (False) precision

  • **model_args (dict) – Additional arguments to pass to the model constructor

cross_val_scores(df, vector_col: str, label_column: str, cv: int = 10)[source]
entropy(df, vector_col: str, output_col: str)[source]

Calculate entropy of predictions.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors

  • vector_col (str) – Name of the column containing feature vectors

  • output_col (str) – Name of the column to store entropy values in

Returns:

The input DataFrame with entropy values added in the output_col

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

get_model()[source]
property nan_fill

Value to use for filling NaN values in feature vectors.

Returns:

The value to use for filling NaN values, or None if no filling is needed

Return type:

float or None

params_dict()[source]

Get a dictionary of model parameters.

Returns:

Dictionary containing model parameters and configuration

Return type:

dict

predict(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]

Make predictions using the trained model.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on

  • vector_col (str) – Name of the column containing feature vectors

  • output_col (str) – Name of the column to store predictions in

Returns:

The input DataFrame with predictions added in the output_col

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

predict_with_confidence(df: DataFrame | DataFrame, vector_col: str, prediction_col: str, confidence_col: str) DataFrame | DataFrame[source]

Make predictions and confidence scores using the trained model.

This method is more efficient than calling predict() and prediction_conf() separately as it computes both in a single pass when possible.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on

  • vector_col (str) – Name of the column containing feature vectors

  • prediction_col (str) – Name of the column to store predictions in

  • confidence_col (str) – Name of the column to store confidence scores in

Returns:

The input DataFrame with predictions and confidence scores added

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

prediction_conf(df, vector_col: str, output_col: str)[source]

Calculate prediction confidence scores.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors

  • vector_col (str) – Name of the column containing feature vectors

  • label_column (str) – Name of the column containing true labels

Returns:

The input DataFrame with confidence scores added

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

train(df, vector_col: str, label_column: str)[source]

Train the model on the given data.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing training data

  • vector_col (str) – Name of the column containing feature vectors

  • label_column (str) – Name of the column containing labels

Returns:

The trained model (self)

Return type:

MLModel

property trained_model

The trained ML Model object

Returns:

The trained ML Model object

Return type:

MLModel

property use_floats

Whether the model uses float32 or float64 precision.

Returns:

True if the model uses float32, False if it uses float64

Return type:

bool

property use_vectors

Whether the model expects feature vectors in vector format.

Returns:

True if the model expects vectors, False if it expects arrays

Return type:

bool

class MatchFlow.SparkMLModel(model, nan_fill=0.0, **model_args)[source]

Bases: MLModel

entropy(df, vector_col: str, output_col: str)[source]

Calculate entropy of predictions.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors

  • vector_col (str) – Name of the column containing feature vectors

  • output_col (str) – Name of the column to store entropy values in

Returns:

The input DataFrame with entropy values added in the output_col

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

get_model()[source]
property nan_fill

Value to use for filling NaN values in feature vectors.

Returns:

The value to use for filling NaN values, or None if no filling is needed

Return type:

float or None

params_dict()[source]

Get a dictionary of model parameters.

Returns:

Dictionary containing model parameters and configuration

Return type:

dict

predict(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]

Make predictions using the trained model.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on

  • vector_col (str) – Name of the column containing feature vectors

  • output_col (str) – Name of the column to store predictions in

Returns:

The input DataFrame with predictions added in the output_col

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

predict_with_confidence(df: DataFrame | DataFrame, vector_col: str, prediction_col: str, confidence_col: str) DataFrame | DataFrame[source]

Make predictions and confidence scores using the trained model.

This method is more efficient than calling predict() and prediction_conf() separately as it computes both in a single pass when possible.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on

  • vector_col (str) – Name of the column containing feature vectors

  • prediction_col (str) – Name of the column to store predictions in

  • confidence_col (str) – Name of the column to store confidence scores in

Returns:

The input DataFrame with predictions and confidence scores added

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

prediction_conf(df, vector_col: str, output_col: str)[source]

Calculate prediction confidence scores.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors

  • vector_col (str) – Name of the column containing feature vectors

  • label_column (str) – Name of the column containing true labels

Returns:

The input DataFrame with confidence scores added

Return type:

pandas.DataFrame or pyspark.sql.DataFrame

train(df, vector_col: str, label_column: str)[source]

Train the model on the given data.

Parameters:
  • df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing training data

  • vector_col (str) – Name of the column containing feature vectors

  • label_column (str) – Name of the column containing labels

Returns:

The trained model (self)

Return type:

MLModel

property trained_model

The trained ML Model object

Returns:

The trained ML Model object

Return type:

MLModel

property use_floats

Whether the model uses float32 or float64 precision.

Returns:

True if the model uses float32, False if it uses float64

Return type:

bool

property use_vectors

Whether the model expects feature vectors in vector format.

Returns:

True if the model expects vectors, False if it expects arrays

Return type:

bool

class MatchFlow.Tokenizer[source]

Bases: ABC

out_col_name(input_col)[source]

the name of the output column from the tokenizer e.g. for a 3gram tokenizer, the tokens from the name columns could be “3gram(name)”

abstractmethod tokenize(s)[source]

convert the string into a BAG of tokens (tokens should not be deduped)

tokenize_set(s)[source]

tokenize the string and return a set or None if the tokenize returns None

tokenize_spark(input_col: Column)[source]

return a column expression that gives the same output as the tokenize method. required for effeciency when building metadata for certain methods

class MatchFlow.Vectorizer[source]

Bases: object

Base class for all vectorizers.

build_from_doc_freqs(doc_freqs)[source]
init()[source]
out_col_name(base)[source]
vectorize(tokens)[source]
class MatchFlow.WebUILabeler(a_df, b_df, id_col: str = '_id', flask_port: int = 5005, streamlit_port: int = 8501, flask_host: str = '127.0.0.1')[source]

Bases: Labeler

Web interface for labeling pairs of records.

Parameters:
  • a_df (Union[pd.DataFrame, SparkDataFrame])

  • b_df (Union[pd.DataFrame, SparkDataFrame])

  • id_col (str, default '_id')

  • flask_port (int, default 5005)

  • streamlit_port (int, default 8501)

  • flask_host (str, default '127.0.0.1')

MatchFlow.apply_matcher(model: MLModel, df: DataFrame | DataFrame, feature_col: str, prediction_col: str, confidence_col: str | None = None) DataFrame | DataFrame[source]

Apply a trained model to make predictions.

Parameters:
  • model (MLModel) – A trained MLModel instance

  • df (pandas DataFrame) – The DataFrame to make predictions on

  • feature_col (str) – Name of the column containing feature vectors

  • prediction_col (str) – Name of the column to store predictions in

  • confidence_col (str, optional) – Name of the column to store confidence scores in. If provided, both predictions and confidence scores will be computed efficiently in a single pass.

Returns:

The input DataFrame with predictions added (and confidence scores if requested)

Return type:

Union[pd.DataFrame, SparkDataFrame]

MatchFlow.create_features(A: DataFrame | DataFrame, B: DataFrame | DataFrame, a_cols: List[str], b_cols: List[str], sim_functions: List[Callable[[...], Any]] | None = None, tokenizers: List[Callable[[...], Any]] | None = None, null_threshold: float = 0.5) List[Callable][source]

creates the features which will be used to featurize your tuple pairs

Parameters:
  • A (Union[pd.DataFrame, SparkDataFrame]) – the records of table A

  • B (Union[pd.DataFrame, SparkDataFrame]) – the records of table B

  • a_cols (list) – The names of the columns for DataFrame A that should have features generated

  • b_cols (list) – The names of the columns for DataFrame B that should have features generated

  • sim_functions (list of callables, optional) – similarity functions to apply (default: None)

  • tokenizers (list of callables, optional) – tokenizers to use (default: None)

  • null_threshold (float) – the portion of values that must be null in order for the column to be dropped and not considered for feature generation

Returns:

a list containing initialized feature objects for columns in A, B

Return type:

List[Callable]

MatchFlow.create_seeds(fvs: DataFrame | DataFrame, nseeds: int, labeler: Labeler, score_column: str = 'score', parquet_file_path: str = 'active-matcher-training-data.parquet') DataFrame | DataFrame[source]

Create labeled seed examples for active learning.

Parameters:
  • fvs (Union[pd.DataFrame, SparkDataFrame]) – DataFrame containing feature vectors with scores

  • nseeds (int) – the number of seeds you want to use to train an initial model

  • labeler (Labeler) – the labeler object you want to use to assign labels to rows

  • score_column (str, default='score') – the name of the score column in your fvs DataFrame

  • parquet_file_path (str, default='active-matcher-training-data.parquet') – The path to save the labeled data to

Returns:

A DataFrame with labeled seeds, schema is (previous schema of fvs, label) where the values in label are either 0.0 or 1.0

Return type:

Union[pd.DataFrame, SparkDataFrame]

MatchFlow.down_sample(fvs: DataFrame | DataFrame, percent: float, search_id_column: str, score_column: str = 'score', bucket_size: int = 1000) DataFrame | DataFrame[source]

down sample by score_column to produce percent * fvs.count() rows

Parameters:
  • fvs (Union[pd.DataFrame, SparkDataFrame]) – the feature vectors to be downsampled

  • percent (float) – the portion of the vectors to be output, (0.0, 1.0]

  • search_id_column (str) – the name of the column containing unique identifiers for each record

  • score_column (str) – the column that scored the vectors, should be positively correlated with the probability of the pair being a match

  • bucket_size (int = 1000) – the size of the buckets for partitioning, default 1000

Returns:

the down sampled dataset with percent * fvs.count() rows with the same schema as fvs

Return type:

Union[pd.DataFrame, SparkDataFrame]

MatchFlow.featurize(features: List[Callable], A, B, candidates, output_col: str = 'feature_vectors', fill_na: float = 0.0) DataFrame | DataFrame[source]

applies the featurizer to the record pairs in candidates

Parameters:
  • features (List[Callable]) – a list containing initialized feature objects for columns in A, B

  • A (Union[pd.DataFrame, SparkDataFrame]) – the records of table A

  • B (Union[pd.DataFrame, SparkDataFrame]) – the records of table B

  • candidates (Union[pd.DataFrame, SparkDataFrame]) – id pairs of A and B that are potential matches candidates with the following columns: - id2: id from table B - id1_list: list of candidate ids from table A.

  • output_col (str) – the name of the column for the resulting feature vectors, default feature_vectors

  • fill_na (float) – value to fill in for missing data, default 0.0

Returns:

DataFrame with feature vectors created with the following schema: (id2, id1, output_col, other columns from candidates). Returns pandas DataFrame if inputs A and B are pandas DataFrames, otherwise returns Spark DataFrame.

Return type:

Union[pd.DataFrame, SparkDataFrame]

MatchFlow.get_base_sim_functions()[source]

get the base similarity functions

Returns:

a list of similarity functions, currently includes: - TFIDFFeature - JaccardFeature - SIFFeature - OverlapCoeffFeature - CosineFeature

Return type:

list

MatchFlow.get_base_tokenizers()[source]

get the base tokenizers

Returns:

a list of tokenizers, currently includes: - StrippedWhiteSpaceTokenizer - NumericTokenizer - QGramTokenizer(3)

Return type:

list

MatchFlow.get_extra_tokenizers()[source]

get the extra tokenizers

Returns:

a list of extratokenizers, currently includes: - AlphaNumericTokenizer - QGramTokenizer(5) - StrippedQGramTokenizer(3) - StrippedQGramTokenizer(5)

Return type:

list

MatchFlow.label_data(model: MLModel, mode: Literal['batch', 'continuous'], labeler: Labeler, fvs: DataFrame | DataFrame, seeds: DataFrame | DataFrame | None = None, parquet_file_path: str = 'active-matcher-training-data.parquet', **learner_kwargs) DataFrame | DataFrame[source]

Generate labeled data using active learning.

Parameters:
  • model (MLModel) – An MLModel instance

  • mode (Literal["batch", "continuous"]) – Whether to use batch or continuous active learning

  • labeler (Labeler) – A Labeler instance

  • fvs (pandas DataFrame) – The data that needs to be labeled

  • seeds (Union[pandas DataFrame, SparkDataFrame], optional) – Initial labeled examples to start with

  • parquet_file_path (str, default='active-matcher-training-data.parquet') – The path to save the labeled data to

  • **learner_kwargs – Additional keyword arguments to pass to the active learner constructor. For batch mode, see EntropyActiveLearner (e.g. batch_size, max_iter). For continuous mode, see ContinuousEntropyActiveLearner (e.g. queue_size, max_labeled, on_demand_stop).

Returns:

DataFrame with ids of potential matches and the corresponding label

Return type:

Union[pd.DataFrame, SparkDataFrame]

MatchFlow.label_pairs(labeler: Labeler, pairs: DataFrame | DataFrame) DataFrame | DataFrame[source]

Label pairs without active learning.

Parameters:
  • labeler (Labeler) – A Labeler instance

  • pairs (Union[pd.DataFrame, SparkDataFrame]) – The pairs to label

Returns:

DataFrame with labeled pairs

Return type:

Union[pd.DataFrame, SparkDataFrame]

MatchFlow.load_dataframe(path, df_type)[source]

Load a DataFrame from disk based on the specified type.

Parameters:
  • path (str) – Path to the saved DataFrame file

  • df_type (str) – Type of DataFrame to load (‘pandas’ or ‘sparkdf’)

Returns:

Loaded DataFrame

Return type:

Union[pd.DataFrame, pyspark.sql.DataFrame]

MatchFlow.load_features(path)[source]

Load a list of feature objects from disk using pickle deserialization.

Parameters:

path (str) – Path to the saved features file

Returns:

List of loaded feature objects

Return type:

List[Callable]

MatchFlow.save_dataframe(dataframe, path)[source]

Save a DataFrame to disk, automatically detecting whether it’s a pandas or Spark DataFrame.

Parameters:
  • dataframe (Union[pd.DataFrame, pyspark.sql.DataFrame]) – DataFrame to save (pandas or Spark)

  • path (str) – Path where to save the DataFrame

Return type:

None

MatchFlow.save_features(features, path)[source]

Save a list of feature objects to disk using pickle serialization.

Parameters:
  • features (List[Callable]) – List of feature objects to save

  • path (str) – Path where to save the features file

Return type:

None

MatchFlow.train_matcher(model: MLModel, labeled_data: DataFrame | DataFrame, feature_col: str = 'feature_vectors', label_col: str = 'label') MLModel[source]

Train a matcher model on labeled data.

Parameters:
  • model (MLModel) – An MLModel instance to train

  • labeled_data (pandas DataFrame) – DataFrame containing the labeled data

  • feature_col (str, default="feature_vectors") – Name of the column containing feature vectors

  • label_col (str, default="label") – Name of the column containing labels

Returns:

The trained model

Return type:

MLModel