| | import csv |
| | import re |
| | import tokenize |
| | from io import StringIO |
| | import os |
| | from tqdm import tqdm |
| | import json |
| | import sys |
| | from functools import lru_cache |
| |
|
| | csv.field_size_limit(sys.maxsize) |
| |
|
| | |
| |
|
| | |
| | _LINE_COMMENT_PATTERNS = { |
| | "python": re.compile(r"#(.*)$"), |
| | "shell": re.compile(r"#(.*)$"), |
| | "r": re.compile(r"#(.*)$"), |
| | "matlab": re.compile(r"%(.*)$"), |
| | "fortran": re.compile(r"!(.*)$"), |
| | "c/c++": re.compile(r"//(.*)$"), |
| | "java": re.compile(r"//(.*)$"), |
| | "go": re.compile(r"//(.*)$"), |
| | "rust": re.compile(r"//(.*)$"), |
| | } |
| |
|
| | |
| | _BLOCK_COMMENT_PATTERNS = { |
| | "python": re.compile(r'("""[\s\S]*?"""|\'\'\'[\s\S]*?\'\'\')'), |
| | "c/c++": re.compile(r"/\*([\s\S]*?)\*/"), |
| | "java": re.compile(r"/\*([\s\S]*?)\*/"), |
| | "rust": re.compile(r"/\*([\s\S]*?)\*/"), |
| | "go": re.compile(r"/\*([\s\S]*?)\*/"), |
| | "matlab": re.compile(r"%\{([\s\S]*?)%\}"), |
| | } |
| |
|
| | |
| | _FUNCTION_PATTERNS = { |
| | "python": re.compile(r"^[ \t]*def\s+(\w+)\s*\(([^)]*)\)", re.MULTILINE), |
| | "java": re.compile(r""" |
| | (?:public|protected|private|static|final|native|synchronized|abstract|\s)* |
| | \s* |
| | (?:[\w\<\>\[\],\s]+) |
| | \s+ |
| | (\w+) |
| | \s*\(([^)]*)\) |
| | (?:\s*throws\s+[\w,\s]+)? |
| | \s*\{ |
| | """, re.MULTILINE | re.VERBOSE), |
| | "c/c++": re.compile(r""" |
| | ^[ \t]* |
| | (?!.*typedef) |
| | (?!.*\#) |
| | (?:[\w\*\s&]+) |
| | \b(\w+)\s* |
| | \(([^)]*)\) |
| | \s*(?:const)? |
| | \s*(?:override)? |
| | \s*(?:noexcept)? |
| | \s*\{ |
| | """, re.MULTILINE | re.VERBOSE), |
| | "go": re.compile(r"\bfunc\s+(?:\([^)]+\)\s*)?(\w+)\s*\(([^)]*)\)", re.MULTILINE), |
| | "rust": re.compile(r"\b(?:pub\s+)?(?:async\s+)?fn\s+(\w+)\s*(?:<[^>]*>)?\s*\(([^)]*)\)", re.MULTILINE), |
| | "r": re.compile(r"(\w+)\s*(?:<-|=)\s*function\s*\(([^)]*)\)", re.MULTILINE), |
| | "matlab": re.compile(r"^[ \t]*function\s+(?:(?:\[?[\w,\s]*\]?\s*=\s*)?(\w+)|(\w+))\s*\(([^)]*)\)", re.MULTILINE), |
| | "shell": re.compile(r"^[ \t]*(?:function\s+)?(\w+)\s*\(\)\s*\{", re.MULTILINE), |
| | "fortran": re.compile(r""" |
| | (?i) |
| | ^[ \t]* |
| | (?:recursive\s+)? |
| | (?:pure\s+)? |
| | (?:elemental\s+)? |
| | (?:[\w\*]+(?:\s*\([^)]*\))?\s+)? |
| | (function|subroutine)\s+ |
| | (\w+)\s* |
| | \(([^)]*)\) |
| | """, re.MULTILINE | re.VERBOSE), |
| | } |
| |
|
| | |
| | _REMOVE_COMMENT_PATTERNS = { |
| | "python_line": re.compile(r'#.*$', re.MULTILINE), |
| | "python_triple_dq": re.compile(r'"""[\s\S]*?"""'), |
| | "python_triple_sq": re.compile(r"'''[\s\S]*?'''"), |
| | "c_line": re.compile(r'//.*$', re.MULTILINE), |
| | "c_block": re.compile(r'/\*[\s\S]*?\*/'), |
| | "shell_line": re.compile(r'#.*$', re.MULTILINE), |
| | "matlab_line": re.compile(r'%.*$', re.MULTILINE), |
| | "matlab_block": re.compile(r'%\{[\s\S]*?%\}'), |
| | "fortran_line": re.compile(r'!.*$', re.MULTILINE), |
| | } |
| |
|
| | def detect_language(file_path: str): |
| | """仅根据文件后缀判断语言""" |
| |
|
| | ext_map = { |
| | ".py": "python", |
| |
|
| | ".java": "java", |
| |
|
| | ".c": "c/c++", |
| | ".h": "c/c++", |
| | ".hh": "c/c++", |
| | ".hpp": "c/c++", |
| | ".cpp": "c/c++", |
| | ".cc": "c/c++", |
| | ".cxx": "c/c++", |
| | ".c++": "c/c++", |
| |
|
| | ".F": "fortran", |
| | ".f90": "fortran", |
| | ".f": "fortran", |
| | ".f95": "fortran", |
| |
|
| | ".r": "r", |
| |
|
| | ".m": "matlab", |
| |
|
| | ".sh": "shell", |
| | ".bash": "shell", |
| |
|
| | ".rs": "rust", |
| | ".go": "go", |
| | } |
| |
|
| | ext = os.path.splitext(file_path)[1].lower() |
| | ext = ext.strip() |
| |
|
| | |
| | |
| |
|
| | return ext_map.get(ext, ext) |
| |
|
| |
|
| | def count_comments(code: str, lang: str): |
| | """统计注释行数与注释 token(支持 Python/Java/C++/Fortran/Matlab/R/Shell/Rust/Go/Jupyter) |
| | |
| | 使用预编译的正则表达式以提高性能。 |
| | """ |
| | |
| | |
| | if lang == "jupyter": |
| | lang = "python" |
| |
|
| | comment_lines = 0 |
| | comment_tokens = [] |
| | lines = code.splitlines() |
| | |
| | |
| | block_comment_line_indices = set() |
| |
|
| | |
| | if lang in _BLOCK_COMMENT_PATTERNS: |
| | patt = _BLOCK_COMMENT_PATTERNS[lang] |
| | |
| | if lang == "python": |
| | |
| | for match in patt.finditer(code): |
| | start_pos = match.start() |
| | end_pos = match.end() |
| | |
| | |
| | start_line = code[:start_pos].count('\n') |
| | end_line = code[:end_pos].count('\n') |
| | |
| | |
| | prefix = code[max(0, start_pos-20):start_pos].strip() |
| | if not prefix.endswith('='): |
| | for line_idx in range(start_line, end_line + 1): |
| | block_comment_line_indices.add(line_idx) |
| | |
| | block_content = match.group(1) |
| | if block_content.startswith('"""'): |
| | block_content = block_content[3:-3] |
| | else: |
| | block_content = block_content[3:-3] |
| | |
| | for b in block_content.splitlines(): |
| | comment_lines += 1 |
| | if b.strip(): |
| | comment_tokens.extend(b.strip().split()) |
| | else: |
| | for match in patt.finditer(code): |
| | start_pos = match.start() |
| | end_pos = match.end() |
| | |
| | start_line = code[:start_pos].count('\n') |
| | end_line = code[:end_pos].count('\n') |
| | |
| | for line_idx in range(start_line, end_line + 1): |
| | block_comment_line_indices.add(line_idx) |
| | |
| | block_content = match.group(1) if match.lastindex else match.group(0) |
| | for b in block_content.splitlines(): |
| | comment_lines += 1 |
| | if b.strip(): |
| | comment_tokens.extend(b.strip().split()) |
| |
|
| | |
| | if lang in _LINE_COMMENT_PATTERNS: |
| | patt = _LINE_COMMENT_PATTERNS[lang] |
| | for line_idx, line in enumerate(lines): |
| | if line_idx in block_comment_line_indices: |
| | continue |
| | |
| | m = patt.search(line) |
| | if m: |
| | prefix = line[:m.start()] |
| | single_quotes = prefix.count("'") - prefix.count("\\'") |
| | double_quotes = prefix.count('"') - prefix.count('\\"') |
| | |
| | if single_quotes % 2 == 0 and double_quotes % 2 == 0: |
| | comment_lines += 1 |
| | text = m.group(1) |
| | if text: |
| | comment_tokens.extend(text.strip().split()) |
| |
|
| | return comment_lines, len(comment_tokens) |
| |
|
| |
|
| | def count_functions_and_parameters(code: str, lang: str): |
| | """统计函数数量与参数数量,支持多语言(含 Fortran subroutine/function)。 |
| | |
| | 使用预编译的正则表达式以提高性能。 |
| | """ |
| | |
| | |
| | if lang == "jupyter": |
| | lang = "python" |
| |
|
| | patt = _FUNCTION_PATTERNS.get(lang) |
| | if not patt: |
| | return 0, 0 |
| |
|
| | |
| | code_no_comments = _remove_comments(code, lang) |
| |
|
| | |
| | matches = patt.findall(code_no_comments) |
| |
|
| | function_count = len(matches) |
| |
|
| | parameter_count = 0 |
| | for m in matches: |
| | if lang == "fortran": |
| | params = m[2] |
| | elif lang == "matlab": |
| | params = m[2] if len(m) > 2 else "" |
| | else: |
| | params = m[1] if isinstance(m, tuple) and len(m) > 1 else "" |
| |
|
| | params = params.strip() if params else "" |
| | if params: |
| | items = [p.strip() for p in params.split(",") if p.strip()] |
| | parameter_count += len(items) |
| |
|
| | return function_count, parameter_count |
| |
|
| |
|
| | def _remove_comments(code: str, lang: str) -> str: |
| | """移除代码中的注释,用于更准确地匹配函数定义(使用预编译正则)""" |
| | |
| | if lang in ("python", "jupyter"): |
| | code = _REMOVE_COMMENT_PATTERNS["python_line"].sub('', code) |
| | code = _REMOVE_COMMENT_PATTERNS["python_triple_dq"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| | code = _REMOVE_COMMENT_PATTERNS["python_triple_sq"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| | |
| | elif lang in ("c/c++", "java", "rust", "go"): |
| | code = _REMOVE_COMMENT_PATTERNS["c_line"].sub('', code) |
| | code = _REMOVE_COMMENT_PATTERNS["c_block"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| | |
| | elif lang == "shell": |
| | code = _REMOVE_COMMENT_PATTERNS["shell_line"].sub('', code) |
| | |
| | elif lang == "r": |
| | code = _REMOVE_COMMENT_PATTERNS["shell_line"].sub('', code) |
| | |
| | elif lang == "matlab": |
| | code = _REMOVE_COMMENT_PATTERNS["matlab_line"].sub('', code) |
| | code = _REMOVE_COMMENT_PATTERNS["matlab_block"].sub(lambda m: '\n' * m.group(0).count('\n'), code) |
| | |
| | elif lang == "fortran": |
| | code = _REMOVE_COMMENT_PATTERNS["fortran_line"].sub('', code) |
| | |
| | return code |
| |
|
| |
|
| | def count_tokens(code: str): |
| | """统计 Python token;非 Python 用简单 split""" |
| | try: |
| | return len(list(tokenize.generate_tokens(StringIO(code).readline))) |
| | except: |
| | return len(code.split()) |
| |
|
| |
|
| | def analyze_code(code_str, code_path): |
| |
|
| | lang = detect_language(code_path) |
| | |
| | |
| | |
| | |
| | lines = code_str.count("\n") + 1 |
| | empty_lines = sum(1 for line in code_str.splitlines() if not line.strip()) |
| | comment_lines, comment_token_count = count_comments(code_str, lang) |
| | functions, parameters = count_functions_and_parameters(code_str, lang) |
| | tokens = count_tokens(code_str) |
| |
|
| | return { |
| | "idx": None, |
| | "language": lang, |
| | "total_lines": lines, |
| | "comment_lines": comment_lines, |
| | "comment_tokenst": comment_token_count, |
| | "empty_lines": empty_lines, |
| | "code_lines": lines - empty_lines - comment_lines, |
| | "tokens": tokens, |
| | "functions": functions, |
| | "parameters": parameters, |
| | } |
| |
|
| |
|
| | if __name__ == "__main__": |
| | input_dir = "/home/weifengsun/tangou1/domain_code/src/datasets/data_merged" |
| | output_dir = "/home/weifengsun/tangou1/domain_code/src/datasets/analysis2" |
| | for i in range(110, 120): |
| | input_filename = f"{i:03}.csv" |
| | output_file_name = f"{i:03}.jsonl" |
| |
|
| | input_path = os.path.join(input_dir, input_filename) |
| | output_path = os.path.join(output_dir, output_file_name) |
| |
|
| | results = [] |
| |
|
| | with open(input_path, "r", encoding="utf-8", errors="replace") as f: |
| | filtered = (line.replace('\0', '') for line in f) |
| | reader = csv.DictReader(filtered) |
| |
|
| | for idx, row in tqdm(enumerate(reader)): |
| | code_str = row.get("text") |
| | code_path = row.get("repo_path") |
| | if not code_path: |
| | code_path = row.get("path") |
| |
|
| | result = analyze_code(code_str, code_path) |
| | result["idx"] = f"{i:03}-{idx}" |
| | results.append(result) |
| |
|
| |
|
| | with open(output_path, "w", encoding="utf-8") as f: |
| | for r in tqdm(results): |
| | f.write(json.dumps(r) + "\n") |
| |
|