MatchFlow
MatchFlow - A toolkit for entity matching.
This package provides tools for creating, training, and applying entity matchers using various tokenization, featurization, and machine learning techniques.
- class MatchFlow.CLILabeler(a_df, b_df, id_col: str = '_id')[source]
Bases:
LabelerCLI for labeling pairs of records.
- Parameters:
a_df (Union[pd.DataFrame, SparkDataFrame]) – the first dataframe
b_df (Union[pd.DataFrame, SparkDataFrame]) – the second dataframe
id_col (str, default '_id') – the column name of the id column
- class MatchFlow.CustomLabeler(a_df, b_df, id_col: str = '_id')[source]
Bases:
LabelerCustom labeler for labeling pairs of records.
- Parameters:
a_df (Union[pd.DataFrame, SparkDataFrame]) – the first dataframe
b_df (Union[pd.DataFrame, SparkDataFrame]) – the second dataframe
id_col (str, default '_id') – the column name of the id column
- class MatchFlow.Feature(a_attr: str, b_attr: str)[source]
Bases:
ABC- property a_attr
the name of the attribute from table a used to generate this feature
- property b_attr
the name of the attribute from table a used to generate this feature
- build(A, B, cache)[source]
Guarenteed to be called before the features preprocessing is done. this method should generate and store all of the metadata required to compute the features over A and B, NOTE B may be None
- class MatchFlow.GoldLabeler(gold)[source]
Bases:
LabelerGold labeler for labeling pairs of records.
- Parameters:
gold (Union[pd.DataFrame, SparkDataFrame]) – the gold dataframe, should contain columns ‘id1’ and ‘id2’
- class MatchFlow.MLModel[source]
Bases:
ABCAbstract base class for machine learning models.
This class defines the interface that all machine learning models must implement, whether they are scikit-learn models or PySpark ML models. It provides methods for training, prediction, confidence estimation, and entropy calculation.
- nan_fill
Value to use for filling NaN values in feature vectors
- Type:
float or None
- use_vectors
Whether the model expects feature vectors in vector format
- Type:
bool
- use_floats
Whether the model uses float32 (True) or float64 (False) precision
- Type:
bool
- abstractmethod entropy(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]
Calculate entropy of predictions.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors
vector_col (str) – Name of the column containing feature vectors
output_col (str) – Name of the column to store entropy values in
- Returns:
The input DataFrame with entropy values added in the output_col
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- abstract property nan_fill: float | None
Value to use for filling NaN values in feature vectors.
- Returns:
The value to use for filling NaN values, or None if no filling is needed
- Return type:
float or None
- abstractmethod params_dict() dict[source]
Get a dictionary of model parameters.
- Returns:
Dictionary containing model parameters and configuration
- Return type:
dict
- abstractmethod predict(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]
Make predictions using the trained model.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on
vector_col (str) – Name of the column containing feature vectors
output_col (str) – Name of the column to store predictions in
- Returns:
The input DataFrame with predictions added in the output_col
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- abstractmethod predict_with_confidence(df: DataFrame | DataFrame, vector_col: str, prediction_col: str, confidence_col: str) DataFrame | DataFrame[source]
Make predictions and confidence scores using the trained model.
This method is more efficient than calling predict() and prediction_conf() separately as it computes both in a single pass when possible.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on
vector_col (str) – Name of the column containing feature vectors
prediction_col (str) – Name of the column to store predictions in
confidence_col (str) – Name of the column to store confidence scores in
- Returns:
The input DataFrame with predictions and confidence scores added
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- abstractmethod prediction_conf(df: DataFrame | DataFrame, vector_col: str, label_column: str) DataFrame | DataFrame[source]
Calculate prediction confidence scores.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors
vector_col (str) – Name of the column containing feature vectors
label_column (str) – Name of the column containing true labels
- Returns:
The input DataFrame with confidence scores added
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- prep_fvs(fvs: DataFrame | DataFrame, feature_col: str = 'feature_vectors') DataFrame | DataFrame[source]
Prepare feature vectors for model input.
This method handles NaN filling and conversion between vector and array formats based on the model’s requirements.
- Parameters:
fvs (pandas.DataFrame or pyspark.sql.DataFrame) – DataFrame containing feature vectors
feature_col (str, optional) – Name of the column containing feature vectors
- Returns:
DataFrame with prepared feature vectors
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- abstractmethod train(df: DataFrame | DataFrame, vector_col: str, label_column: str)[source]
Train the model on the given data.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing training data
vector_col (str) – Name of the column containing feature vectors
label_column (str) – Name of the column containing labels
- Returns:
The trained model (self)
- Return type:
- abstract property trained_model
The trained ML Model object
- Returns:
The trained ML Model object
- Return type:
- abstract property use_floats: bool
Whether the model uses float32 or float64 precision.
- Returns:
True if the model uses float32, False if it uses float64
- Return type:
bool
- abstract property use_vectors: bool
Whether the model expects feature vectors in vector format.
- Returns:
True if the model expects vectors, False if it expects arrays
- Return type:
bool
- class MatchFlow.SKLearnModel(model, nan_fill=None, use_floats=True, **model_args)[source]
Bases:
MLModelScikit-learn model wrapper.
This class wraps scikit-learn models to provide a consistent interface with PySpark ML models. It handles conversion between pandas and PySpark DataFrames, and manages model training and prediction.
- Parameters:
model (sklearn.base.BaseEstimator or type) – The scikit-learn model class or instance to use
nan_fill (float or None, optional) – Value to use for filling NaN values
use_floats (bool, optional) – Whether to use float32 (True) or float64 (False) precision
**model_args (dict) – Additional arguments to pass to the model constructor
- entropy(df, vector_col: str, output_col: str)[source]
Calculate entropy of predictions.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors
vector_col (str) – Name of the column containing feature vectors
output_col (str) – Name of the column to store entropy values in
- Returns:
The input DataFrame with entropy values added in the output_col
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- property nan_fill
Value to use for filling NaN values in feature vectors.
- Returns:
The value to use for filling NaN values, or None if no filling is needed
- Return type:
float or None
- params_dict()[source]
Get a dictionary of model parameters.
- Returns:
Dictionary containing model parameters and configuration
- Return type:
dict
- predict(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]
Make predictions using the trained model.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on
vector_col (str) – Name of the column containing feature vectors
output_col (str) – Name of the column to store predictions in
- Returns:
The input DataFrame with predictions added in the output_col
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- predict_with_confidence(df: DataFrame | DataFrame, vector_col: str, prediction_col: str, confidence_col: str) DataFrame | DataFrame[source]
Make predictions and confidence scores using the trained model.
This method is more efficient than calling predict() and prediction_conf() separately as it computes both in a single pass when possible.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on
vector_col (str) – Name of the column containing feature vectors
prediction_col (str) – Name of the column to store predictions in
confidence_col (str) – Name of the column to store confidence scores in
- Returns:
The input DataFrame with predictions and confidence scores added
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- prediction_conf(df, vector_col: str, output_col: str)[source]
Calculate prediction confidence scores.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors
vector_col (str) – Name of the column containing feature vectors
label_column (str) – Name of the column containing true labels
- Returns:
The input DataFrame with confidence scores added
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- train(df, vector_col: str, label_column: str)[source]
Train the model on the given data.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing training data
vector_col (str) – Name of the column containing feature vectors
label_column (str) – Name of the column containing labels
- Returns:
The trained model (self)
- Return type:
- property trained_model
The trained ML Model object
- Returns:
The trained ML Model object
- Return type:
- property use_floats
Whether the model uses float32 or float64 precision.
- Returns:
True if the model uses float32, False if it uses float64
- Return type:
bool
- property use_vectors
Whether the model expects feature vectors in vector format.
- Returns:
True if the model expects vectors, False if it expects arrays
- Return type:
bool
- class MatchFlow.SparkMLModel(model, nan_fill=0.0, **model_args)[source]
Bases:
MLModel- entropy(df, vector_col: str, output_col: str)[source]
Calculate entropy of predictions.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors
vector_col (str) – Name of the column containing feature vectors
output_col (str) – Name of the column to store entropy values in
- Returns:
The input DataFrame with entropy values added in the output_col
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- property nan_fill
Value to use for filling NaN values in feature vectors.
- Returns:
The value to use for filling NaN values, or None if no filling is needed
- Return type:
float or None
- params_dict()[source]
Get a dictionary of model parameters.
- Returns:
Dictionary containing model parameters and configuration
- Return type:
dict
- predict(df: DataFrame | DataFrame, vector_col: str, output_col: str) DataFrame | DataFrame[source]
Make predictions using the trained model.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on
vector_col (str) – Name of the column containing feature vectors
output_col (str) – Name of the column to store predictions in
- Returns:
The input DataFrame with predictions added in the output_col
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- predict_with_confidence(df: DataFrame | DataFrame, vector_col: str, prediction_col: str, confidence_col: str) DataFrame | DataFrame[source]
Make predictions and confidence scores using the trained model.
This method is more efficient than calling predict() and prediction_conf() separately as it computes both in a single pass when possible.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors to predict on
vector_col (str) – Name of the column containing feature vectors
prediction_col (str) – Name of the column to store predictions in
confidence_col (str) – Name of the column to store confidence scores in
- Returns:
The input DataFrame with predictions and confidence scores added
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- prediction_conf(df, vector_col: str, output_col: str)[source]
Calculate prediction confidence scores.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing the feature vectors
vector_col (str) – Name of the column containing feature vectors
label_column (str) – Name of the column containing true labels
- Returns:
The input DataFrame with confidence scores added
- Return type:
pandas.DataFrame or pyspark.sql.DataFrame
- train(df, vector_col: str, label_column: str)[source]
Train the model on the given data.
- Parameters:
df (pandas.DataFrame or pyspark.sql.DataFrame) – The DataFrame containing training data
vector_col (str) – Name of the column containing feature vectors
label_column (str) – Name of the column containing labels
- Returns:
The trained model (self)
- Return type:
- property trained_model
The trained ML Model object
- Returns:
The trained ML Model object
- Return type:
- property use_floats
Whether the model uses float32 or float64 precision.
- Returns:
True if the model uses float32, False if it uses float64
- Return type:
bool
- property use_vectors
Whether the model expects feature vectors in vector format.
- Returns:
True if the model expects vectors, False if it expects arrays
- Return type:
bool
- class MatchFlow.Tokenizer[source]
Bases:
ABC- out_col_name(input_col)[source]
the name of the output column from the tokenizer e.g. for a 3gram tokenizer, the tokens from the name columns could be “3gram(name)”
- class MatchFlow.WebUILabeler(a_df, b_df, id_col: str = '_id', flask_port: int = 5005, streamlit_port: int = 8501, flask_host: str = '127.0.0.1')[source]
Bases:
LabelerWeb interface for labeling pairs of records.
- Parameters:
a_df (Union[pd.DataFrame, SparkDataFrame])
b_df (Union[pd.DataFrame, SparkDataFrame])
id_col (str, default '_id')
flask_port (int, default 5005)
streamlit_port (int, default 8501)
flask_host (str, default '127.0.0.1')
- MatchFlow.apply_matcher(model: MLModel, df: DataFrame | DataFrame, feature_col: str, prediction_col: str, confidence_col: str | None = None) DataFrame | DataFrame[source]
Apply a trained model to make predictions.
- Parameters:
model (MLModel) – A trained MLModel instance
df (pandas DataFrame) – The DataFrame to make predictions on
feature_col (str) – Name of the column containing feature vectors
prediction_col (str) – Name of the column to store predictions in
confidence_col (str, optional) – Name of the column to store confidence scores in. If provided, both predictions and confidence scores will be computed efficiently in a single pass.
- Returns:
The input DataFrame with predictions added (and confidence scores if requested)
- Return type:
Union[pd.DataFrame, SparkDataFrame]
- MatchFlow.create_features(A: DataFrame | DataFrame, B: DataFrame | DataFrame, a_cols: List[str], b_cols: List[str], sim_functions: List[Callable[[...], Any]] | None = None, tokenizers: List[Callable[[...], Any]] | None = None, null_threshold: float = 0.5) List[Callable][source]
creates the features which will be used to featurize your tuple pairs
- Parameters:
A (Union[pd.DataFrame, SparkDataFrame]) – the records of table A
B (Union[pd.DataFrame, SparkDataFrame]) – the records of table B
a_cols (list) – The names of the columns for DataFrame A that should have features generated
b_cols (list) – The names of the columns for DataFrame B that should have features generated
sim_functions (list of callables, optional) – similarity functions to apply (default: None)
tokenizers (list of callables, optional) – tokenizers to use (default: None)
null_threshold (float) – the portion of values that must be null in order for the column to be dropped and not considered for feature generation
- Returns:
a list containing initialized feature objects for columns in A, B
- Return type:
List[Callable]
- MatchFlow.create_seeds(fvs: DataFrame | DataFrame, nseeds: int, labeler: Labeler, score_column: str = 'score', parquet_file_path: str = 'active-matcher-training-data.parquet') DataFrame | DataFrame[source]
Create labeled seed examples for active learning.
- Parameters:
fvs (Union[pd.DataFrame, SparkDataFrame]) – DataFrame containing feature vectors with scores
nseeds (int) – the number of seeds you want to use to train an initial model
labeler (Labeler) – the labeler object you want to use to assign labels to rows
score_column (str, default='score') – the name of the score column in your fvs DataFrame
parquet_file_path (str, default='active-matcher-training-data.parquet') – The path to save the labeled data to
- Returns:
A DataFrame with labeled seeds, schema is (previous schema of fvs, label) where the values in label are either 0.0 or 1.0
- Return type:
Union[pd.DataFrame, SparkDataFrame]
- MatchFlow.down_sample(fvs: DataFrame | DataFrame, percent: float, search_id_column: str, score_column: str = 'score', bucket_size: int = 1000) DataFrame | DataFrame[source]
down sample by score_column to produce percent * fvs.count() rows
- Parameters:
fvs (Union[pd.DataFrame, SparkDataFrame]) – the feature vectors to be downsampled
percent (float) – the portion of the vectors to be output, (0.0, 1.0]
search_id_column (str) – the name of the column containing unique identifiers for each record
score_column (str) – the column that scored the vectors, should be positively correlated with the probability of the pair being a match
bucket_size (int = 1000) – the size of the buckets for partitioning, default 1000
- Returns:
the down sampled dataset with percent * fvs.count() rows with the same schema as fvs
- Return type:
Union[pd.DataFrame, SparkDataFrame]
- MatchFlow.featurize(features: List[Callable], A, B, candidates, output_col: str = 'feature_vectors', fill_na: float = 0.0) DataFrame | DataFrame[source]
applies the featurizer to the record pairs in candidates
- Parameters:
features (List[Callable]) – a list containing initialized feature objects for columns in A, B
A (Union[pd.DataFrame, SparkDataFrame]) – the records of table A
B (Union[pd.DataFrame, SparkDataFrame]) – the records of table B
candidates (Union[pd.DataFrame, SparkDataFrame]) – id pairs of A and B that are potential matches candidates with the following columns: - id2: id from table B - id1_list: list of candidate ids from table A.
output_col (str) – the name of the column for the resulting feature vectors, default feature_vectors
fill_na (float) – value to fill in for missing data, default 0.0
- Returns:
DataFrame with feature vectors created with the following schema: (id2, id1, output_col, other columns from candidates). Returns pandas DataFrame if inputs A and B are pandas DataFrames, otherwise returns Spark DataFrame.
- Return type:
Union[pd.DataFrame, SparkDataFrame]
- MatchFlow.get_base_sim_functions()[source]
get the base similarity functions
- Returns:
a list of similarity functions, currently includes: - TFIDFFeature - JaccardFeature - SIFFeature - OverlapCoeffFeature - CosineFeature
- Return type:
list
- MatchFlow.get_base_tokenizers()[source]
get the base tokenizers
- Returns:
a list of tokenizers, currently includes: - StrippedWhiteSpaceTokenizer - NumericTokenizer - QGramTokenizer(3)
- Return type:
list
- MatchFlow.get_extra_tokenizers()[source]
get the extra tokenizers
- Returns:
a list of extratokenizers, currently includes: - AlphaNumericTokenizer - QGramTokenizer(5) - StrippedQGramTokenizer(3) - StrippedQGramTokenizer(5)
- Return type:
list
- MatchFlow.label_data(model: MLModel, mode: Literal['batch', 'continuous'], labeler: Labeler, fvs: DataFrame | DataFrame, seeds: DataFrame | DataFrame | None = None, parquet_file_path: str = 'active-matcher-training-data.parquet', **learner_kwargs) DataFrame | DataFrame[source]
Generate labeled data using active learning.
- Parameters:
model (MLModel) – An MLModel instance
mode (Literal["batch", "continuous"]) – Whether to use batch or continuous active learning
labeler (Labeler) – A Labeler instance
fvs (pandas DataFrame) – The data that needs to be labeled
seeds (Union[pandas DataFrame, SparkDataFrame], optional) – Initial labeled examples to start with
parquet_file_path (str, default='active-matcher-training-data.parquet') – The path to save the labeled data to
**learner_kwargs – Additional keyword arguments to pass to the active learner constructor. For batch mode, see EntropyActiveLearner (e.g. batch_size, max_iter). For continuous mode, see ContinuousEntropyActiveLearner (e.g. queue_size, max_labeled, on_demand_stop).
- Returns:
DataFrame with ids of potential matches and the corresponding label
- Return type:
Union[pd.DataFrame, SparkDataFrame]
- MatchFlow.label_pairs(labeler: Labeler, pairs: DataFrame | DataFrame) DataFrame | DataFrame[source]
Label pairs without active learning.
- Parameters:
labeler (Labeler) – A Labeler instance
pairs (Union[pd.DataFrame, SparkDataFrame]) – The pairs to label
- Returns:
DataFrame with labeled pairs
- Return type:
Union[pd.DataFrame, SparkDataFrame]
- MatchFlow.load_dataframe(path, df_type)[source]
Load a DataFrame from disk based on the specified type.
- Parameters:
path (str) – Path to the saved DataFrame file
df_type (str) – Type of DataFrame to load (‘pandas’ or ‘sparkdf’)
- Returns:
Loaded DataFrame
- Return type:
Union[pd.DataFrame, pyspark.sql.DataFrame]
- MatchFlow.load_features(path)[source]
Load a list of feature objects from disk using pickle deserialization.
- Parameters:
path (str) – Path to the saved features file
- Returns:
List of loaded feature objects
- Return type:
List[Callable]
- MatchFlow.save_dataframe(dataframe, path)[source]
Save a DataFrame to disk, automatically detecting whether it’s a pandas or Spark DataFrame.
- Parameters:
dataframe (Union[pd.DataFrame, pyspark.sql.DataFrame]) – DataFrame to save (pandas or Spark)
path (str) – Path where to save the DataFrame
- Return type:
None
- MatchFlow.save_features(features, path)[source]
Save a list of feature objects to disk using pickle serialization.
- Parameters:
features (List[Callable]) – List of feature objects to save
path (str) – Path where to save the features file
- Return type:
None
- MatchFlow.train_matcher(model: MLModel, labeled_data: DataFrame | DataFrame, feature_col: str = 'feature_vectors', label_col: str = 'label') MLModel[source]
Train a matcher model on labeled data.
- Parameters:
model (MLModel) – An MLModel instance to train
labeled_data (pandas DataFrame) – DataFrame containing the labeled data
feature_col (str, default="feature_vectors") – Name of the column containing feature vectors
label_col (str, default="label") – Name of the column containing labels
- Returns:
The trained model
- Return type: