| | import streamlit as st |
| | import pandas as pd |
| | import plotly.express as px |
| | import plotly.graph_objects as go |
| | from Bio import pairwise2 |
| | from collections import defaultdict |
| | import re |
| |
|
| | |
| | IMPORTANT_GENES = { |
| | 'rpoB': {'range': (759807, 763325), 'description': 'RNA polymerase β subunit (Rifampicin resistance)'}, |
| | 'katG': {'range': (2153889, 2156111), 'description': 'Catalase-peroxidase (Isoniazid resistance)'}, |
| | 'inhA': {'range': (1674202, 1675011), 'description': 'Enoyl-ACP reductase (Isoniazid resistance)'}, |
| | 'gyrA': {'range': (7302, 9818), 'description': 'DNA gyrase subunit A (Fluoroquinolone resistance)'} |
| | } |
| |
|
| | def read_fasta_from_upload(uploaded_file): |
| | """Read a FASTA file from Streamlit upload""" |
| | content = uploaded_file.getvalue().decode('utf-8').strip() |
| | parts = content.split('\n', 1) |
| | sequence = ''.join(parts[1].split('\n')).replace(' ', '') |
| | return sequence.upper() |
| |
|
| | def split_genome_into_chunks(sequence, chunk_size=10000, overlap=100): |
| | """Split genome into manageable chunks for alignment""" |
| | chunks = [] |
| | positions = [] |
| | for i in range(0, len(sequence), chunk_size - overlap): |
| | chunk = sequence[i:i + chunk_size] |
| | chunks.append(chunk) |
| | positions.append(i) |
| | return chunks, positions |
| |
|
| | def find_mutations_in_chunk(ref_chunk, query_chunk, chunk_start): |
| | """Find mutations in a genome chunk""" |
| | mutations = [] |
| | |
| | alignments = pairwise2.align.globalms(ref_chunk, query_chunk, |
| | match=2, |
| | mismatch=-3, |
| | open=-10, |
| | extend=-0.5) |
| | |
| | if not alignments: |
| | return mutations |
| | |
| | alignment = alignments[0] |
| | ref_aligned, query_aligned = alignment[0], alignment[1] |
| | |
| | real_pos = 0 |
| | for i in range(len(ref_aligned)): |
| | if ref_aligned[i] != '-': |
| | real_pos += 1 |
| | |
| | if ref_aligned[i] != query_aligned[i]: |
| | abs_pos = chunk_start + real_pos - 1 |
| | mut = { |
| | 'position': abs_pos, |
| | 'ref_base': ref_aligned[i], |
| | 'query_base': query_aligned[i] if query_aligned[i] != '-' else 'None', |
| | 'type': 'SNP' if ref_aligned[i] != '-' and query_aligned[i] != '-' else 'INDEL', |
| | 'context': { |
| | 'ref': ref_aligned[max(0,i-5):i] + '[' + ref_aligned[i] + ']' + ref_aligned[i+1:i+6], |
| | 'query': query_aligned[max(0,i-5):i] + '[' + query_aligned[i] + ']' + query_aligned[i+1:i+6] |
| | } |
| | } |
| | |
| | |
| | for gene, info in IMPORTANT_GENES.items(): |
| | start, end = info['range'] |
| | if start <= abs_pos <= end: |
| | mut['gene'] = gene |
| | mut['gene_position'] = abs_pos - start + 1 |
| | mut['gene_description'] = info['description'] |
| | |
| | mutations.append(mut) |
| | |
| | return mutations |
| |
|
| | def visualize_mutations(mutations, genome_length): |
| | """Create mutation visualization plots""" |
| | |
| | gene_regions = [] |
| | for gene, info in IMPORTANT_GENES.items(): |
| | start, end = info['range'] |
| | gene_regions.append({ |
| | 'gene': gene, |
| | 'start': start, |
| | 'end': end, |
| | 'y': 1 |
| | }) |
| |
|
| | |
| | fig = go.Figure() |
| |
|
| | |
| | for region in gene_regions: |
| | fig.add_trace(go.Scatter( |
| | x=[region['start'], region['end']], |
| | y=[region['y'], region['y']], |
| | mode='lines', |
| | name=region['gene'], |
| | line=dict(width=10), |
| | hoverinfo='text', |
| | hovertext=f"{region['gene']}: {region['start']}-{region['end']}" |
| | )) |
| |
|
| | |
| | mutation_data = pd.DataFrame(mutations) |
| | if not mutation_data.empty: |
| | fig.add_trace(go.Scatter( |
| | x=mutation_data['position'], |
| | y=[1.1] * len(mutation_data), |
| | mode='markers', |
| | name='Mutations', |
| | marker=dict( |
| | color=['red' if t == 'SNP' else 'blue' for t in mutation_data['type']], |
| | size=8 |
| | ), |
| | hoverinfo='text', |
| | hovertext=mutation_data.apply( |
| | lambda x: f"Position: {x['position']}<br>" |
| | f"Type: {x['type']}<br>" |
| | f"Change: {x['ref_base']}->{x['query_base']}", |
| | axis=1 |
| | ) |
| | )) |
| |
|
| | fig.update_layout( |
| | title="Genome-wide Mutation Distribution", |
| | xaxis_title="Genome Position", |
| | yaxis_visible=False, |
| | showlegend=True, |
| | height=400 |
| | ) |
| |
|
| | return fig |
| |
|
| | def analyze_mutations(mutations): |
| | """Generate comprehensive mutation statistics""" |
| | stats = { |
| | 'total_mutations': len(mutations), |
| | 'snps': len([m for m in mutations if m['type'] == 'SNP']), |
| | 'indels': len([m for m in mutations if m['type'] == 'INDEL']), |
| | 'by_gene': defaultdict(int), |
| | 'important_mutations': [] |
| | } |
| | |
| | for mut in mutations: |
| | if 'gene' in mut: |
| | stats['by_gene'][mut['gene']] += 1 |
| | stats['important_mutations'].append(mut) |
| | |
| | return stats |
| |
|
| | def main(): |
| | st.title("M. tuberculosis Full Genome Comparison") |
| | |
| | st.markdown(""" |
| | This tool performs whole-genome comparison of M. tuberculosis strains, identifying mutations |
| | and analyzing resistance-associated genes. |
| | |
| | **Instructions:** |
| | 1. Upload your reference genome (typically H37Rv) |
| | 2. Upload your query genome (clinical isolate) |
| | 3. Configure analysis parameters if needed |
| | 4. Run the analysis |
| | """) |
| | |
| | |
| | col1, col2 = st.columns(2) |
| | with col1: |
| | reference_file = st.file_uploader("Reference Genome (FASTA)", type=['fasta', 'fa']) |
| | with col2: |
| | query_file = st.file_uploader("Query Genome (FASTA)", type=['fasta', 'fa']) |
| | |
| | |
| | with st.expander("Advanced Settings"): |
| | chunk_size = st.slider("Analysis chunk size (bp)", 5000, 20000, 10000, 1000) |
| | overlap = st.slider("Chunk overlap (bp)", 50, 200, 100, 10) |
| | |
| | if reference_file and query_file: |
| | if st.button("Run Analysis"): |
| | with st.spinner("Analyzing genomes..."): |
| | try: |
| | |
| | ref_genome = read_fasta_from_upload(reference_file) |
| | query_genome = read_fasta_from_upload(query_file) |
| | |
| | |
| | progress_bar = st.progress(0) |
| | status = st.empty() |
| | |
| | |
| | status.text("Splitting genomes into chunks...") |
| | ref_chunks, chunk_positions = split_genome_into_chunks(ref_genome, chunk_size, overlap) |
| | query_chunks, _ = split_genome_into_chunks(query_genome, chunk_size, overlap) |
| | |
| | |
| | status.text("Analyzing mutations...") |
| | all_mutations = [] |
| | total_chunks = len(ref_chunks) |
| | |
| | for i, (ref_chunk, query_chunk, chunk_start) in enumerate(zip(ref_chunks, query_chunks, chunk_positions)): |
| | progress_bar.progress((i + 1) / total_chunks) |
| | mutations = find_mutations_in_chunk(ref_chunk, query_chunk, chunk_start) |
| | all_mutations.extend(mutations) |
| | |
| | |
| | progress_bar.empty() |
| | status.empty() |
| | |
| | |
| | stats = analyze_mutations(all_mutations) |
| | |
| | |
| | st.success("Analysis complete!") |
| | |
| | |
| | st.header("Results Summary") |
| | col1, col2, col3 = st.columns(3) |
| | col1.metric("Total Mutations", stats['total_mutations']) |
| | col2.metric("SNPs", stats['snps']) |
| | col3.metric("INDELs", stats['indels']) |
| | |
| | |
| | st.plotly_chart(visualize_mutations(all_mutations, len(ref_genome))) |
| | |
| | |
| | st.header("Resistance-Associated Genes") |
| | gene_mutations = pd.DataFrame([ |
| | {"Gene": gene, "Mutations": count, "Description": IMPORTANT_GENES[gene]['description']} |
| | for gene, count in stats['by_gene'].items() |
| | ]) |
| | |
| | if not gene_mutations.empty: |
| | st.dataframe(gene_mutations) |
| | |
| | |
| | if stats['important_mutations']: |
| | st.header("Detailed Mutation Analysis") |
| | mutations_df = pd.DataFrame(stats['important_mutations']) |
| | st.dataframe(mutations_df) |
| | |
| | |
| | csv = mutations_df.to_csv(index=False) |
| | st.download_button( |
| | "Download Results (CSV)", |
| | csv, |
| | "mtb_mutations.csv", |
| | "text/csv", |
| | key='download-csv' |
| | ) |
| | |
| | except Exception as e: |
| | st.error(f"Analysis error: {str(e)}") |
| |
|
| | if __name__ == "__main__": |
| | main() |