Spaces:
Runtime error
Runtime error
| import logging | |
| import yaml | |
| from pathlib import Path | |
| from typing import List, Dict | |
| from app.services.base import load_spacy_model | |
| from app.core.config import settings, APP_NAME, SPACY_MODEL_ID | |
| from app.core.exceptions import ServiceError | |
| logger = logging.getLogger(f"{APP_NAME}.services.inclusive_language") | |
| class InclusiveLanguageChecker: | |
| def __init__(self, rules_directory: str = settings.INCLUSIVE_RULES_DIR): | |
| self._nlp = None | |
| self.matcher = None | |
| self.rules = self._load_inclusive_rules(Path(rules_directory)) | |
| def _load_inclusive_rules(self, rules_path: Path) -> Dict[str, Dict]: | |
| """ | |
| Load YAML-based inclusive language rules from the given directory. | |
| """ | |
| if not rules_path.is_dir(): | |
| logger.error(f"Inclusive language rules directory not found: {rules_path}") | |
| raise ServiceError( | |
| status_code=500, | |
| detail=f"Inclusive language rules directory not found: {rules_path}" | |
| ) | |
| rules = {} | |
| for yaml_file in rules_path.glob("*.yml"): | |
| try: | |
| with yaml_file.open(encoding="utf-8") as f: | |
| rule_list = yaml.safe_load(f) | |
| if not isinstance(rule_list, list): | |
| logger.warning(f"Skipping non-list rule file: {yaml_file}") | |
| continue | |
| for rule in rule_list: | |
| inconsiderate = rule.get("inconsiderate", []) | |
| considerate = rule.get("considerate", []) | |
| note = rule.get("note", "") | |
| source = rule.get("source", "") | |
| rule_type = rule.get("type", "basic") | |
| # Ensure consistent formatting | |
| if isinstance(considerate, str): | |
| considerate = [considerate] | |
| if isinstance(inconsiderate, str): | |
| inconsiderate = [inconsiderate] | |
| for phrase in inconsiderate: | |
| rules[phrase.lower()] = { | |
| "considerate": considerate, | |
| "note": note, | |
| "source": source, | |
| "type": rule_type | |
| } | |
| except Exception as e: | |
| logger.error(f"Error loading rule file {yaml_file}: {e}", exc_info=True) | |
| raise ServiceError( | |
| status_code=500, | |
| detail=f"Failed to load inclusive language rules: {e}" | |
| ) | |
| logger.info(f"Loaded {len(rules)} inclusive language rules from {rules_path}") | |
| return rules | |
| def _get_nlp(self): | |
| """ | |
| Lazy-loads the spaCy model for NLP processing. | |
| """ | |
| if self._nlp is None: | |
| self._nlp = load_spacy_model(SPACY_MODEL_ID) | |
| return self._nlp | |
| def _init_matcher(self, nlp): | |
| """ | |
| Initializes spaCy PhraseMatcher using loaded rules. | |
| """ | |
| from spacy.matcher import PhraseMatcher | |
| matcher = PhraseMatcher(nlp.vocab, attr="LOWER") | |
| for phrase in self.rules: | |
| matcher.add(phrase, [nlp.make_doc(phrase)]) | |
| logger.info(f"PhraseMatcher initialized with {len(self.rules)} phrases.") | |
| return matcher | |
| async def check(self, text: str) -> dict: | |
| """ | |
| Checks a string for non-inclusive language based on rule definitions. | |
| """ | |
| text = text.strip() | |
| if not text: | |
| raise ServiceError(status_code=400, detail="Input text is empty for inclusive language check.") | |
| try: | |
| nlp = self._get_nlp() | |
| if self.matcher is None: | |
| self.matcher = self._init_matcher(nlp) | |
| doc = nlp(text) | |
| matches = self.matcher(doc) | |
| results = [] | |
| matched_spans = set() | |
| # Match exact phrases | |
| for match_id, start, end in matches: | |
| phrase = nlp.vocab.strings[match_id].lower() | |
| if any(s <= start < e or s < end <= e for s, e in matched_spans): | |
| continue # Avoid overlapping matches | |
| matched_spans.add((start, end)) | |
| rule = self.rules.get(phrase) | |
| if rule: | |
| results.append({ | |
| "term": doc[start:end].text, | |
| "type": rule["type"], | |
| "note": rule["note"], | |
| "suggestions": rule["considerate"], | |
| "context": doc[start:end].sent.text, | |
| "start_char": doc[start].idx, | |
| "end_char": doc[end - 1].idx + len(doc[end - 1]), | |
| "source": rule["source"] | |
| }) | |
| # Match individual token lemmas (fallback) | |
| for token in doc: | |
| lemma = token.lemma_.lower() | |
| if (token.i, token.i + 1) in matched_spans: | |
| continue # Already matched in phrase | |
| if lemma in self.rules: | |
| rule = self.rules[lemma] | |
| results.append({ | |
| "term": token.text, | |
| "type": rule["type"], | |
| "note": rule["note"], | |
| "suggestions": rule["considerate"], | |
| "context": token.sent.text, | |
| "start_char": token.idx, | |
| "end_char": token.idx + len(token), | |
| "source": rule["source"] | |
| }) | |
| return {"issues": results} | |
| except Exception as e: | |
| logger.error(f"Inclusive language check error for text: '{text[:50]}...'", exc_info=True) | |
| raise ServiceError( | |
| status_code=500, | |
| detail="An internal error occurred during inclusive language checking." | |
| ) from e | |