Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate Retrieval/Reranking/Instruction Variants #1359

Merged
merged 19 commits into from
Nov 13, 2024
Merged
10 changes: 4 additions & 6 deletions mteb/abstasks/AbsTaskInstructionRetrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
from mteb.encoder_interface import Encoder

from ..evaluation.evaluators import utils
from ..evaluation.evaluators.InstructionRetrievalEvaluator import (
InstructionRetrievalEvaluator,
)
from ..evaluation.evaluators.RetrievalEvaluator import RetrievalEvaluator
from .AbsTask import AbsTask, DescriptiveStatistics
from .AbsTaskRetrieval import HFDataLoader

Expand Down Expand Up @@ -347,7 +345,7 @@ def load_data(self, **kwargs):

def _evaluate_subset_lang(
self,
retriever: InstructionRetrievalEvaluator,
retriever: RetrievalEvaluator,
corpus: dict,
queries: dict,
og_relevant_docs: dict,
Expand Down Expand Up @@ -467,7 +465,7 @@ def evaluate(
encode_kwargs: dict[str, Any] = {},
**kwargs,
) -> dict[str, dict[str, Any]]:
retriever = InstructionRetrievalEvaluator(
retriever = RetrievalEvaluator(
retriever=model,
task_name=self.metadata.name,
encode_kwargs=encode_kwargs,
Expand Down Expand Up @@ -523,7 +521,7 @@ def _add_main_score(self, scores: dict[str, dict[str, float]]) -> None:

def _evaluate_subset(
self,
retriever: InstructionRetrievalEvaluator,
retriever: RetrievalEvaluator,
corpus: dict[str, dict[str, str]],
queries: dict[str, str],
relevant_docs: dict[str, dict[str, int]],
Expand Down
280 changes: 199 additions & 81 deletions mteb/abstasks/AbsTaskReranking.py
Original file line number Diff line number Diff line change
@@ -1,98 +1,216 @@
from __future__ import annotations

from typing import Any
import logging
from collections import defaultdict

import datasets
import tqdm
from datasets import Dataset

from mteb.encoder_interface import Encoder
from mteb.load_results.task_results import ScoresDict

from ..evaluation.evaluators import RerankingEvaluator
from .AbsTask import AbsTask, DescriptiveStatistics


class RerankingDescriptiveStatistics(DescriptiveStatistics):
"""Descriptive statistics for Reranking

Attributes:
num_samples: number of samples in the dataset.
num_positive: Number of positive examples
num_negative: Number of negative examples
avg_query_len: Average length of queries
avg_positive_len: Average length of positive examples
avg_negative_len: Average length of negative examples
"""

num_samples: int
num_positive: int
num_negative: int
avg_query_len: float
avg_positive_len: float
avg_negative_len: float


class AbsTaskReranking(AbsTask):
"""Abstract class for re-ranking experiments.

from ..load_results.task_results import ScoresDict
from .AbsTaskRetrieval import AbsTaskRetrieval

logger = logging.getLogger(__name__)

OLD_FORMAT_RERANKING_TASKS = [
orionw marked this conversation as resolved.
Show resolved Hide resolved
"AskUbuntuDupQuestions",
"MindSmallReranking",
"SciDocsRR",
"StackOverflowDupQuestions",
"WebLINXCandidatesReranking",
"AlloprofReranking",
"SyntecReranking",
"VoyageMMarcoReranking",
"ESCIReranking",
"MIRACLReranking",
"WikipediaRerankingMultilingual",
"RuBQReranking",
"T2Reranking",
"MMarcoReranking",
"CMedQAv1-reranking",
"CMedQAv2-reranking",
]


class AbsTaskReranking(AbsTaskRetrieval):
orionw marked this conversation as resolved.
Show resolved Hide resolved
"""Abstract class for re-ranking experiments. This is mostly the same as the RetrievalEvaluator, but as previously it wasn't we need to keep it to transform old dataset versions into the same format.

New Format:
-----------
Same as AbsTaskRetrieval, but with a top_ranked file that contains the passages to rerank. The dataset should contain the following columns:

self.corpus: dict[str, dict[str, str]]
Semantically, it should contain dict[split_name, dict[sample_id, dict[str, str]]]
E.g. {"test": {"document_one": {"_id": "d1", "title": "title", "text": "text"}}}

self.queries: dict[str, dict[str, Union[str, list[str]]]]
Semantically, it should contain dict[split_name, dict[sample_id, str]] or dict[split_name, dict[sample_id, list[str]]] for conversations
E.g. {"test": {"q1": "query"}}
or {"test": {"q1": ["turn1", "turn2", "turn3"]}}

self.relevant_docs: dict[str, dict[str, dict[str, int]]]
Semantically, it should contain dict[split_name, dict[sample_id, dict[doc_id, score]]]
E.g.: {"test": {"q1": {"document_one": 1}}}

self.top_ranked: dict[str, dict[str, list[str]]] or dict[str, dict[str, dict[str, float]]]
Semantically, it should contain dict[split_name, dict[sample_id, list[doc_id]]] or dict[split_name, dict[sample_id, dict[doc_id, score]]]
E.g.: {"test": {"q1": ["document_one", "document_two"]}} or {"test": {"q1": {"document_one": 1, "document_two": 0.5}}}

Old Format:
-----------
self.load_data() must generate a huggingface dataset with a split matching self.metadata_dict["eval_splits"], and assign it to self.dataset. It must contain the following columns:
query: str
positive: list[str]
negative: list[str]
orionw marked this conversation as resolved.
Show resolved Hide resolved
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)
super(AbsTaskRetrieval, self).__init__(**kwargs)

def load_data(self, **kwargs):
if self.data_loaded:
return

if self.metadata.name in OLD_FORMAT_RERANKING_TASKS:
self.dataset = datasets.load_dataset(**self.metadata_dict["dataset"]) # type: ignore
self.dataset_transform()
else:
# use AbsTaskRetrieval default to load the data
# TODO: need to make sure top_ranked comes back
orionw marked this conversation as resolved.
Show resolved Hide resolved
return super().load_data(**kwargs)

def process_example(self, example: dict, split: str, query_idx: int) -> dict:
"""Process a single example from the dataset."""
query = example["query"]
positive_docs = example["positive"]
negative_docs = example["negative"]

query_id = f"{split}_query{query_idx}"

# Initialize the structures for this example
example_data = {
"query_id": query_id,
"query": query,
"doc_ids": [],
"doc_texts": [],
"relevance_scores": [],
}

for i, pos_doc in enumerate(sorted(positive_docs)):
doc_id = f"{query_id}_positive_{i}"
example_data["doc_ids"].append(doc_id)
example_data["doc_texts"].append(pos_doc)
example_data["relevance_scores"].append(1)

for i, neg_doc in enumerate(sorted(negative_docs)):
doc_id = f"{query_id}_negative_{i}"
example_data["doc_ids"].append(doc_id)
example_data["doc_texts"].append(neg_doc)
example_data["relevance_scores"].append(0)

return example_data

def dataset_transform(self):
orionw marked this conversation as resolved.
Show resolved Hide resolved
"""Transform the old format to the new format using HF datasets mapping."""
if self.metadata.name not in OLD_FORMAT_RERANKING_TASKS:
return

logging.info(
f"Transforming old format to standard format for {self.metadata.name}"
)

self.corpus = defaultdict(lambda: defaultdict(dict))
self.queries = defaultdict(lambda: defaultdict(dict))
self.relevant_docs = defaultdict(lambda: defaultdict(lambda: defaultdict(dict)))
self.top_ranked = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))

hf_subsets = list(self.hf_subsets) if self.is_multilingual else ["default"]

for hf_subset in hf_subsets:
cur_dataset = self.dataset[hf_subset]

for split in cur_dataset:
# Create an enumerated dataset to pass indices
enumerated_dataset = Dataset.from_dict(
{
"index": range(len(cur_dataset[split])),
"query": cur_dataset[split]["query"],
"positive": cur_dataset[split]["positive"],
"negative": cur_dataset[split]["negative"],
}
)

# Map the transformation function over the dataset
processed_dataset = enumerated_dataset.map(
lambda example, idx: self.process_example(example, split, idx),
with_indices=True,
remove_columns=enumerated_dataset.column_names,
)

# Populate the data structures
for item in processed_dataset:
query_id = item["query_id"]
self.queries[hf_subset][split][query_id] = item["query"]

# Add documents and relevance information
for doc_id, doc_text, relevance in zip(
item["doc_ids"], item["doc_texts"], item["relevance_scores"]
):
self.corpus[hf_subset][split][doc_id] = {
"text": doc_text,
"_id": doc_id,
}
self.top_ranked[hf_subset][split][query_id].append(doc_id)
self.relevant_docs[hf_subset][split][query_id][doc_id] = (
relevance
)

self.instructions = None
self.data_loaded = True

def _evaluate_subset(
self,
model: Encoder,
data_split: Dataset,
*,
encode_kwargs: dict[str, Any] = {},
**kwargs: Any,
self, retriever, corpus, queries, relevant_docs, hf_subset: str, **kwargs
) -> ScoresDict:
evaluator = RerankingEvaluator(
data_split,
task_name=self.metadata.name,
encode_kwargs=encode_kwargs,
"""Evaluate each query_id as a "mini" retrieval corpus, and rerank the top-ranked documents for each query_id."""
all_results = defaultdict(dict)
max_docs = 0
top_ranked = kwargs["top_ranked"] # must be present for reranking
for query_id in tqdm.tqdm(
list(queries.keys()), leave=False, desc="Reranking over query-ids.."
):
cur_queries = {query_id: queries[query_id]}
if "instructions" in kwargs:
instructions = kwargs["instructions"]
cur_instructions = {queries[query_id]: instructions[queries[query_id]]}
else:
cur_instructions = None

doc_ids_to_rerank = top_ranked[query_id]
cur_corpus = {doc_id: corpus[doc_id] for doc_id in doc_ids_to_rerank}
if (
len(cur_corpus) > max_docs
): # use this to make sure we get the correct MAP/MRR at max length
max_docs = len(cur_corpus)

# to handle instruction-based reranking we pass both query_id and instructions (unused if not instruction-based)
results = retriever(
cur_corpus,
cur_queries,
instructions=cur_instructions,
query_id=query_id,
)
# results should have only one key, the query_id
all_results[query_id] = results[query_id]

# do the evaluation like normal now, but pass our results
if max_docs > max(retriever.k_values):
retriever.k_values += [max_docs]
return super()._evaluate_subset(
retriever,
corpus,
queries,
relevant_docs,
hf_subset,
results=all_results,
**kwargs,
)
scores = evaluator(model)

self._add_main_score(scores)
return scores

def _add_main_score(self, scores: ScoresDict) -> None:
scores["main_score"] = scores[self.metadata.main_score]

def _calculate_metrics_from_split(
self, split: str, hf_subset: str | None = None, compute_overall: bool = False
) -> RerankingDescriptiveStatistics:
if hf_subset:
query = self.dataset[hf_subset][split]["query"]
positive = self.dataset[hf_subset][split]["positive"]
negative = self.dataset[hf_subset][split]["negative"]
elif compute_overall:
query = []
positive = []
negative = []
for hf_subset in self.metadata.eval_langs:
query.extend(self.dataset[hf_subset][split]["query"])
positive.extend(self.dataset[hf_subset][split]["positive"])
negative.extend(self.dataset[hf_subset][split]["negative"])
else:
query = self.dataset[split]["query"]
positive = self.dataset[split]["positive"]
negative = self.dataset[split]["negative"]

total_len_query = sum([len(q) for q in query])
total_len_positive = sum([len(p) for p in positive])
total_len_negative = sum([len(n) for n in negative])
return RerankingDescriptiveStatistics(
num_samples=len(query),
num_positive=len(positive),
num_negative=len(negative),
avg_query_len=total_len_query / len(query),
avg_positive_len=total_len_positive / len(positive),
avg_negative_len=total_len_negative / len(negative),
)
28 changes: 23 additions & 5 deletions mteb/abstasks/AbsTaskRetrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,16 @@ class AbsTaskRetrieval(AbsTask):
ignore_identical_ids: bool = False

def __init__(self, **kwargs):
super().__init__(**kwargs)
try:
super(AbsTask, self).__init__(**kwargs)
except Exception:
super().__init__(**kwargs)

def load_data(self, **kwargs):
if self.data_loaded:
return
self.corpus, self.queries, self.relevant_docs = {}, {}, {}
self.instructions, self.top_ranked = None, None
dataset_path = self.metadata_dict["dataset"]["path"]
hf_repo_qrels = (
dataset_path + "-qrels" if "clarin-knext" in dataset_path else None
Expand Down Expand Up @@ -289,12 +293,21 @@ def evaluate(
self.queries[split],
self.relevant_docs[split],
)
if self.top_ranked is not None:
kwargs["top_ranked"] = self.top_ranked[split]
if self.instructions is not None:
kwargs["instructions"] = self.instructions[split]
else:
corpus, queries, relevant_docs = (
self.corpus[hf_subset][split],
self.queries[hf_subset][split],
self.relevant_docs[hf_subset][split],
)
if self.top_ranked is not None:
kwargs["top_ranked"] = self.top_ranked[hf_subset][split]
if self.instructions is not None:
kwargs["instructions"] = self.instructions[hf_subset][split]

scores[hf_subset] = self._evaluate_subset(
retriever, corpus, queries, relevant_docs, hf_subset, **kwargs
)
Expand All @@ -303,10 +316,15 @@ def evaluate(
def _evaluate_subset(
self, retriever, corpus, queries, relevant_docs, hf_subset: str, **kwargs
) -> ScoresDict:
start_time = time()
results = retriever(corpus, queries)
end_time = time()
logger.info(f"Time taken to retrieve: {end_time - start_time:.2f} seconds")
if "results" in kwargs:
# reranking has already been done
results = kwargs["results"]
else:
# perform the retrieval here
start_time = time()
results = retriever(corpus, queries)
end_time = time()
logger.info(f"Time taken to retrieve: {end_time - start_time:.2f} seconds")

save_predictions = kwargs.get("save_predictions", False)
export_errors = kwargs.get("export_errors", False)
Expand Down
Loading
Loading