Instructions to use nikraf/directionality_probe with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use nikraf/directionality_probe with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("feature-extraction", model="nikraf/directionality_probe", trust_remote_code=True)# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("nikraf/directionality_probe", trust_remote_code=True, dtype="auto") - Notebooks
- Google Colab
- Kaggle
| import entrypoint_setup | |
| import argparse | |
| import copy | |
| import random | |
| import time | |
| from pathlib import Path | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import seaborn as sns | |
| import torch | |
| from tqdm.auto import tqdm | |
| from transformers import AutoModelForMaskedLM | |
| def set_seed(seed: int): | |
| random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| np.random.seed(seed) | |
| SUPPORTED_BACKENDS = ("sdpa", "flex", "kernels_flash") | |
| class ThroughputChecker: | |
| def __init__( | |
| self, | |
| warmup_batches: int = 10, | |
| timed_batches: int = 100, | |
| ): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.warmup_batches = warmup_batches | |
| self.timed_batches = timed_batches | |
| self.canonical_amino_acids = "ACDEFGHIKLMNPQRSTVWY" | |
| def _load_model(self, model_path: str): | |
| model = AutoModelForMaskedLM.from_pretrained( | |
| model_path, | |
| dtype=torch.bfloat16, | |
| device_map=self.device, | |
| trust_remote_code=True, | |
| ).eval() | |
| return model | |
| def _generate_random_sequence(self, length: int) -> str: | |
| return "M" + "".join(random.choices(self.canonical_amino_acids, k=length - 1)) | |
| def _generate_random_batch(self, batch_size: int, min_length: int, max_length: int) -> list[str]: | |
| max_length_example = self._generate_random_sequence(max_length) | |
| return [max_length_example] + [ | |
| self._generate_random_sequence(random.randint(min_length, max_length)) | |
| for _ in range(batch_size - 1) | |
| ] | |
| def _time(self, model, tokenizer, batch_size: int, min_length: int, max_length: int): | |
| model = model.to(self.device).eval() | |
| set_seed(42) | |
| min_dynamic_warmup_batches = self.warmup_batches | |
| max_dynamic_warmup_batches = self.warmup_batches * 10 | |
| stability_window = 3 | |
| relative_stability_tolerance = 0.10 | |
| def synchronize(): | |
| if self.device.type == "cuda": | |
| torch.cuda.synchronize() | |
| def run_one_batch() -> int: | |
| batch = self._generate_random_batch(batch_size, min_length, max_length) | |
| tokenized = tokenizer( | |
| batch, | |
| return_tensors="pt", | |
| padding="max_length", | |
| max_length=max_length, | |
| truncation=True, | |
| add_special_tokens=True, | |
| ) | |
| input_ids = tokenized["input_ids"] | |
| if "attention_mask" in tokenized: | |
| nonpad_tokens_this = tokenized["attention_mask"].sum().item() | |
| else: | |
| pad_token_id = tokenizer.pad_token_id | |
| if pad_token_id is not None: | |
| nonpad_tokens_this = (input_ids != pad_token_id).sum().item() | |
| else: | |
| nonpad_tokens_this = input_ids.numel() | |
| tokenized = {k: v.to(self.device) for k, v in tokenized.items()} | |
| _ = model(**tokenized, output_hidden_states=True) | |
| return nonpad_tokens_this | |
| def time_batches(num_batches: int, message: str): | |
| processed_tokens = 0 | |
| synchronize() | |
| start_time = time.time() | |
| for _ in tqdm(range(num_batches), desc=message, leave=False): | |
| processed_tokens += run_one_batch() | |
| synchronize() | |
| end_time = time.time() | |
| return end_time - start_time, processed_tokens | |
| # Compile first, then keep warming up until the compiled path stabilizes. | |
| model = torch.compile(model) | |
| warmup_latencies = [] | |
| for warmup_idx in tqdm(range(max_dynamic_warmup_batches), desc="Warmup (dynamic)", leave=False): | |
| synchronize() | |
| warmup_start = time.time() | |
| _ = run_one_batch() | |
| synchronize() | |
| warmup_latency = time.time() - warmup_start | |
| warmup_latencies.append(warmup_latency) | |
| if warmup_idx + 1 < min_dynamic_warmup_batches: | |
| continue | |
| if len(warmup_latencies) < 2 * stability_window: | |
| continue | |
| previous_window = warmup_latencies[-2 * stability_window:-stability_window] | |
| current_window = warmup_latencies[-stability_window:] | |
| previous_mean = sum(previous_window) / stability_window | |
| current_mean = sum(current_window) / stability_window | |
| assert previous_mean > 0.0, "Warmup latency mean should be positive." | |
| relative_change = abs(current_mean - previous_mean) / previous_mean | |
| if relative_change <= relative_stability_tolerance: | |
| break | |
| time_taken, timed_tokens_sum = time_batches(self.timed_batches, "Timed") | |
| if self.device.type == "cuda": | |
| torch.cuda.empty_cache() | |
| return time_taken, timed_tokens_sum | |
| def evaluate(self, model_path: str, batch_sizes: list[int], min_length: int, sequence_lengths: list[int], backends: list[str]): | |
| results = {backend: {} for backend in backends} | |
| original_model = self._load_model(model_path) | |
| tokenizer = original_model.tokenizer | |
| for backend in backends: | |
| print(f"Benchmarking {model_path} with backend={backend}") | |
| try: | |
| backend_model = copy.deepcopy(original_model) | |
| backend_model.attn_backend = backend | |
| except AssertionError as error: | |
| print(f"Skipping backend '{backend}' for {model_path}: {error}") | |
| continue | |
| for bs in batch_sizes: | |
| for max_length in sequence_lengths: | |
| model_copy = copy.deepcopy(backend_model) | |
| time_taken, tokens = self._time( | |
| model_copy, | |
| tokenizer, | |
| bs, | |
| min_length, | |
| max_length, | |
| ) | |
| results[backend][(bs, max_length)] = {"time": time_taken, "tokens": tokens} | |
| original_model.cpu() | |
| del original_model | |
| if self.device.type == "cuda": | |
| torch.cuda.empty_cache() | |
| return results | |
| def plot_results(all_results: dict, output_path: str): | |
| sns.set_theme(style="whitegrid") | |
| plot_data = [] | |
| for model_path, results in all_results.items(): | |
| model_name = Path(model_path).name | |
| for backend in sorted(results.keys()): | |
| for (bs, max_length), entry in results[backend].items(): | |
| time_taken = entry["time"] | |
| nonpad_tokens = entry["tokens"] | |
| tokens_per_sec = nonpad_tokens / time_taken if time_taken > 0 else 0.0 | |
| plot_data.append( | |
| { | |
| "Model": model_name, | |
| "Backend": backend, | |
| "Batch": bs, | |
| "SeqLen": max_length, | |
| "TokensPerSec": tokens_per_sec, | |
| "NonPadTokens": nonpad_tokens, | |
| "Seconds": time_taken, | |
| } | |
| ) | |
| if not plot_data: | |
| return | |
| plot_df = pd.DataFrame(plot_data) | |
| sequence_lengths = sorted(plot_df["SeqLen"].dropna().unique().tolist()) | |
| plot = sns.relplot( | |
| data=plot_df, | |
| x="SeqLen", | |
| y="TokensPerSec", | |
| hue="Backend", | |
| style="Batch", | |
| kind="line", | |
| marker="o", | |
| dashes=False, | |
| col="Model", | |
| col_wrap=1, | |
| height=4.5, | |
| aspect=1.5, | |
| facet_kws={"sharey": False}, | |
| ) | |
| plot.set_titles("{col_name}") | |
| plot.set(xticks=sequence_lengths) | |
| plot.set_axis_labels("Sequence length", "Non-pad tokens/s") | |
| plot.figure.suptitle("Throughput comparison by model") | |
| plot.tight_layout() | |
| plot.figure.subplots_adjust(top=0.93, right=0.95, bottom=0.06) | |
| plot.add_legend(title="Backend / Batch") | |
| plt.savefig(output_path, dpi=300) | |
| print(f"Results saved to {output_path}") | |
| if __name__ == "__main__": | |
| # On Windows, use "%cd%" instead of "${PWD}" to get the current working directory: | |
| # docker run --gpus all -v "%cd%":/workspace fastplms python -m testing.throughput | |
| # On Linux/macOS, keep using ${PWD}: | |
| # docker run --gpus all -v ${PWD}:/workspace fastplms python -m testing.throughput | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--hf_token", type=str, default=None) | |
| parser.add_argument( | |
| "--model_paths", | |
| nargs="+", | |
| default=["Synthyra/ESM2-8M", "Synthyra/ESMplusplus_small"], | |
| ) | |
| parser.add_argument("--batch_sizes", nargs="+", type=int, default=[2, 4, 8]) | |
| parser.add_argument("--sequence_lengths", nargs="+", type=int, default=[64, 128, 256, 512, 1024, 2048]) | |
| parser.add_argument("--backends", nargs="+", choices=SUPPORTED_BACKENDS, default=list(SUPPORTED_BACKENDS)) | |
| parser.add_argument("--min_length", type=int, default=32) | |
| parser.add_argument("--warmup_batches", type=int, default=10) | |
| parser.add_argument("--timed_batches", type=int, default=100) | |
| parser.add_argument("--output_path", type=str, default="throughput_comparison.png") | |
| args = parser.parse_args() | |
| if args.hf_token: | |
| from huggingface_hub import login | |
| login(token=args.hf_token) | |
| checker = ThroughputChecker(warmup_batches=args.warmup_batches, timed_batches=args.timed_batches) | |
| all_results = {} | |
| for model_path in args.model_paths: | |
| all_results[model_path] = checker.evaluate( | |
| model_path, | |
| args.batch_sizes, | |
| min_length=args.min_length, | |
| sequence_lengths=args.sequence_lengths, | |
| backends=args.backends, | |
| ) | |
| plot_results(all_results, args.output_path) | |