import pdfplumber import requests import re from typing import List, Dict import os from tqdm import tqdm class PDFParser: def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) def download_pdf(self, url: str, filename: str) -> str: local_path = os.path.join('data/raw', filename) if os.path.exists(local_path): print(f'PDF уже загружен: {filename}') return local_path try: print(f'Загрузка PDF: {url}') response = self.session.get(url, stream=True, timeout=60) response.raise_for_status() os.makedirs('data/raw', exist_ok=True) with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f'PDF сохранен: {local_path}') return local_path except Exception as e: print(f'Ошибка загрузки PDF {url}: {e}') return None def parse_pdf(self, pdf_path: str, program_id: str) -> List[Dict]: courses = [] try: with pdfplumber.open(pdf_path) as pdf: print(f'Парсинг PDF: {pdf_path}') for page_num, page in enumerate(tqdm(pdf.pages, desc='Страницы')): page_courses = self._parse_page(page, page_num + 1, program_id) courses.extend(page_courses) print(f'Найдено курсов: {len(courses)}') except Exception as e: print(f'Ошибка парсинга PDF {pdf_path}: {e}') return courses return courses def _parse_page(self, page, page_num: int, program_id: str) -> List[Dict]: courses = [] try: tables = page.extract_tables() for table in tables: table_courses = self._parse_table(table, page_num, program_id) courses.extend(table_courses) if not courses: courses = self._parse_text_fallback(page, page_num, program_id) except Exception as e: print(f'Ошибка парсинга страницы {page_num}: {e}') return courses def _parse_table(self, table: list, page_num: int, program_id: str) -> List[Dict]: courses = [] if not table or len(table) < 2: return courses headers = [str(cell).lower().strip() if cell else '' for cell in table[0]] for row_idx, row in enumerate(table[1:], 1): if not row or len(row) < 3: continue course = self._extract_course_from_row(row, headers, page_num, program_id) if course: courses.append(course) return courses def _extract_course_from_row(self, row: list, headers: list, page_num: int, program_id: str) -> Dict: try: row = [str(cell).strip() if cell else '' for cell in row] name = self._extract_name(row, headers) if not name or len(name) < 3: return None semester = self._extract_semester(row, headers) credits = self._extract_credits(row, headers) hours = self._extract_hours(row, headers) course_type = self._extract_type(row, headers) course = { 'id': f'{program_id}_{page_num}_{hash(name) % 10000}', 'program_id': program_id, 'semester': semester, 'name': name, 'credits': credits, 'hours': hours, 'type': course_type, 'source_pdf': os.path.basename(pdf_path), 'source_page': page_num } return course except Exception as e: print(f'Ошибка извлечения курса из строки: {e}') return None def _extract_name(self, row: list, headers: list) -> str: name_indicators = ['название', 'дисциплина', 'курс', 'предмет', 'name', 'course'] for i, header in enumerate(headers): if any(indicator in header for indicator in name_indicators): if i < len(row) and row[i]: return row[i] if len(row) > 0 and row[0]: return row[0] return '' def _extract_semester(self, row: list, headers: list) -> int: semester_indicators = ['семестр', 'semester', 'сем'] for i, header in enumerate(headers): if any(indicator in header for indicator in semester_indicators): if i < len(row) and row[i]: try: return int(re.findall(r'\d+', row[i])[0]) except: pass return 1 def _extract_credits(self, row: list, headers: list) -> int: credit_indicators = ['кредит', 'credit', 'зет', 'з.е.'] for i, header in enumerate(headers): if any(indicator in header for indicator in credit_indicators): if i < len(row) and row[i]: try: return int(re.findall(r'\d+', row[i])[0]) except: pass return 0 def _extract_hours(self, row: list, headers: list) -> int: hour_indicators = ['час', 'hour', 'ауд'] for i, header in enumerate(headers): if any(indicator in header for indicator in hour_indicators): if i < len(row) and row[i]: try: return int(re.findall(r'\d+', row[i])[0]) except: pass return 0 def _extract_type(self, row: list, headers: list) -> str: type_indicators = ['тип', 'type', 'вид'] for i, header in enumerate(headers): if any(indicator in header for indicator in type_indicators): if i < len(row) and row[i]: text = row[i].lower() if any(word in text for word in ['обязательная', 'required', 'обяз']): return 'required' elif any(word in text for word in ['по выбору', 'elective', 'выбор']): return 'elective' return 'required' def _parse_text_fallback(self, page, page_num: int, program_id: str) -> List[Dict]: courses = [] try: text = page.extract_text() if not text: return courses lines = text.split('\n') current_semester = 1 for line in lines: line = line.strip() if not line: continue if 'семестр' in line.lower(): semester_match = re.findall(r'\d+', line) if semester_match: current_semester = int(semester_match[0]) continue if len(line) > 10 and not line.isdigit(): course = { 'id': f'{program_id}_{page_num}_{hash(line) % 10000}', 'program_id': program_id, 'semester': current_semester, 'name': line, 'credits': 0, 'hours': 0, 'type': 'required', 'source_pdf': os.path.basename(program_id), 'source_page': page_num } courses.append(course) except Exception as e: print(f'Ошибка fallback парсинга страницы {page_num}: {e}') return courses def main(): parser = PDFParser() test_url = 'https://example.com/test.pdf' test_filename = 'test.pdf' local_path = parser.download_pdf(test_url, test_filename) if local_path: courses = parser.parse_pdf(local_path, 'test_program') print(f'Найдено курсов: {len(courses)}') if __name__ == '__main__': main()