import json import os import re from pathlib import Path from typing import Dict, Any, Optional, Union from datetime import datetime def clean_json_string(text: str) -> str: """ Clean a string that might contain markdown formatting and extract just the JSON content. """ # Remove markdown code blocks text = re.sub(r'```json\s*\n?', '', text) text = re.sub(r'```\s*\n?', '', text) # Remove leading/trailing whitespace and newlines text = text.strip() return text def dump_json_record(filename: str, record: Union[Dict[str, Any], str], lineage_extraction_dumps_folder: str = "lineage_extraction_dumps") -> Union[Dict[str, Any], str]: """ Create a file under the lineagedb folder and dump a JSON record as a new line. Args: filename (str): The name of the file (without extension, .json will be added) record (Union[Dict[str, Any], str]): The JSON record to dump (can be dict or string) lineage_extraction_dumps_folder (str): The folder name for lineage database files (default: "lineage_extraction_dumps") Returns: Union[Dict[str, Any], str]: The processed record that was dumped to the file Example: dumped_data = dump_json_record("user_queries", {"query": "SELECT * FROM users"}) dumped_data = dump_json_record("outputs", "This is a string output") """ # Create the lineagedb folder if it doesn't exist # folder_path = Path(lineage_extraction_dumps_folder) # folder_path.mkdir(exist_ok=True) # Create the full file path with .json extension # file_path = folder_path / f"{filename}.json" # Handle different input types if isinstance(record, str): # Clean the string first to remove any markdown formatting cleaned_record = clean_json_string(record) # Try to parse as JSON first, then re-serialize properly try: # Parse the string as JSON to get the actual data parsed_data = json.loads(cleaned_record) # Re-serialize without escaping newlines and with proper formatting json_line = json.dumps(parsed_data, ensure_ascii=False, separators=(',', ':')) processed_record = parsed_data except json.JSONDecodeError: # If it's not valid JSON, treat it as a plain string json_line = json.dumps(cleaned_record, ensure_ascii=False) processed_record = cleaned_record elif isinstance(record, dict): # If it's already a dict, convert to JSON string json_line = json.dumps(record, ensure_ascii=False, separators=(',', ':')) processed_record = record else: # For other types, convert to string and then to JSON cleaned_record = clean_json_string(str(record)) try: parsed_data = json.loads(cleaned_record) json_line = json.dumps(parsed_data, ensure_ascii=False, separators=(',', ':')) processed_record = parsed_data except json.JSONDecodeError: json_line = json.dumps(cleaned_record, ensure_ascii=False) processed_record = cleaned_record # Append the JSON record as a new line to the file # with open(file_path, "a", encoding="utf-8") as f: # f.write(json_line + "\n") return processed_record def read_json_records(filename: str, lineagedb_folder: str = "lineagedb") -> list: """ Read all JSON records from a file in the lineagedb folder. Args: filename (str): The name of the file (without extension) lineagedb_folder (str): The folder name for lineage database files (default: "lineagedb") Returns: list: List of dictionaries containing the JSON records """ folder_path = Path(lineagedb_folder) file_path = folder_path / f"{filename}.json" records = [] if file_path.exists(): with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: # Skip empty lines try: record = json.loads(line) records.append(record) except json.JSONDecodeError as e: print(f"Warning: Could not parse JSON line: {line[:50]}... Error: {e}") return records def clear_json_file(filename: str, lineagedb_folder: str = "lineagedb") -> None: """ Clear all records from a JSON file in the lineagedb folder. Args: filename (str): The name of the file (without extension) lineagedb_folder (str): The folder name for lineage database files (default: "lineagedb") """ folder_path = Path(lineagedb_folder) file_path = folder_path / f"{filename}.json" if file_path.exists(): file_path.unlink() # Delete the file print(f"Cleared file: {file_path}") def get_file_stats(filename: str, lineagedb_folder: str = "lineagedb") -> Dict[str, Any]: """ Get statistics about a JSON file in the lineagedb folder. Args: filename (str): The name of the file (without extension) lineagedb_folder (str): The folder name for lineage database files (default: "lineagedb") Returns: Dict[str, Any]: Statistics about the file including record count, file size, etc. """ folder_path = Path(lineagedb_folder) file_path = folder_path / f"{filename}.json" stats = { "filename": f"{filename}.json", "exists": file_path.exists(), "record_count": 0, "file_size_bytes": 0, "created_time": None, "modified_time": None } if file_path.exists(): stats["file_size_bytes"] = file_path.stat().st_size stats["created_time"] = datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() stats["modified_time"] = datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() # Count records with open(file_path, "r", encoding="utf-8") as f: stats["record_count"] = sum(1 for line in f if line.strip()) return stats