|
|
import gradio as gr |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
class WatermarkRemover: |
|
|
"""Watermark removal using traditional CV and deep learning""" |
|
|
|
|
|
def __init__(self): |
|
|
self.device = "cpu" |
|
|
try: |
|
|
import torch |
|
|
if torch.cuda.is_available(): |
|
|
self.device = "cuda" |
|
|
except: |
|
|
pass |
|
|
|
|
|
def detect_watermark_auto(self, frame, threshold=200, edge_threshold=100): |
|
|
""" |
|
|
Auto-detect watermark using multiple techniques: |
|
|
1. Brightness threshold |
|
|
2. Edge detection |
|
|
3. Temporal consistency (for videos) |
|
|
""" |
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
h, w = gray.shape |
|
|
|
|
|
|
|
|
_, bright_mask = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY) |
|
|
|
|
|
|
|
|
edges = cv2.Canny(gray, 50, 150) |
|
|
edges_dilated = cv2.dilate(edges, np.ones((3,3), np.uint8), iterations=1) |
|
|
|
|
|
|
|
|
corners = cv2.cornerHarris(gray.astype(np.float32), 2, 3, 0.04) |
|
|
corners = cv2.dilate(corners, None) |
|
|
corner_mask = np.zeros_like(gray) |
|
|
corner_mask[corners > 0.01 * corners.max()] = 255 |
|
|
|
|
|
|
|
|
combined_mask = cv2.bitwise_or(bright_mask, edges_dilated) |
|
|
combined_mask = cv2.bitwise_or(combined_mask, corner_mask) |
|
|
|
|
|
|
|
|
kernel = np.ones((5, 5), np.uint8) |
|
|
combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_CLOSE, kernel, iterations=2) |
|
|
combined_mask = cv2.morphologyEx(combined_mask, cv2.MORPH_OPEN, kernel, iterations=1) |
|
|
|
|
|
|
|
|
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(combined_mask, connectivity=8) |
|
|
min_size = 50 |
|
|
for i in range(1, num_labels): |
|
|
if stats[i, cv2.CC_STAT_AREA] < min_size: |
|
|
combined_mask[labels == i] = 0 |
|
|
|
|
|
return combined_mask |
|
|
|
|
|
def inpaint_frame(self, frame, mask, method='telea'): |
|
|
""" |
|
|
Inpaint a single frame using OpenCV |
|
|
Methods: 'telea' or 'ns' (Navier-Stokes) |
|
|
""" |
|
|
if method == 'telea': |
|
|
inpainted = cv2.inpaint(frame, mask, 3, cv2.INPAINT_TELEA) |
|
|
else: |
|
|
inpainted = cv2.inpaint(frame, mask, 3, cv2.INPAINT_NS) |
|
|
return inpainted |
|
|
|
|
|
def create_temporal_mask(self, frames, masks): |
|
|
""" |
|
|
Refine masks using temporal information |
|
|
Watermarks usually appear in the same location across frames |
|
|
""" |
|
|
if len(masks) < 2: |
|
|
return masks |
|
|
|
|
|
|
|
|
mask_stack = np.stack(masks, axis=0) |
|
|
|
|
|
|
|
|
consistency = np.mean(mask_stack > 0, axis=0) |
|
|
|
|
|
|
|
|
temporal_mask = (consistency > 0.7).astype(np.uint8) * 255 |
|
|
|
|
|
|
|
|
refined_masks = [temporal_mask for _ in masks] |
|
|
|
|
|
return refined_masks |
|
|
|
|
|
def process_video_watermark_removal( |
|
|
input_video, |
|
|
use_auto_detect, |
|
|
manual_mask_image, |
|
|
brightness_threshold, |
|
|
inpaint_method, |
|
|
use_temporal_consistency |
|
|
): |
|
|
"""Main processing function""" |
|
|
|
|
|
if input_video is None: |
|
|
return None, "β Please upload a video first!" |
|
|
|
|
|
status_log = [] |
|
|
status_log.append("π Starting watermark removal process...") |
|
|
|
|
|
|
|
|
remover = WatermarkRemover() |
|
|
status_log.append(f"β
Initialized on device: {remover.device}") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(input_video) |
|
|
if not cap.isOpened(): |
|
|
return None, "β Error: Could not open video file!" |
|
|
|
|
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
status_log.append(f"πΉ Video info: {width}x{height}, {fps} fps, {total_frames} frames") |
|
|
|
|
|
|
|
|
output_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
|
|
|
manual_mask = None |
|
|
if not use_auto_detect and manual_mask_image is not None: |
|
|
try: |
|
|
mask_pil = Image.open(manual_mask_image).convert('L') |
|
|
manual_mask = np.array(mask_pil.resize((width, height))) |
|
|
_, manual_mask = cv2.threshold(manual_mask, 127, 255, cv2.THRESH_BINARY) |
|
|
status_log.append("β
Loaded manual mask") |
|
|
except Exception as e: |
|
|
status_log.append(f"β οΈ Error loading mask: {e}, using auto-detection") |
|
|
use_auto_detect = True |
|
|
|
|
|
|
|
|
frames = [] |
|
|
masks = [] |
|
|
|
|
|
status_log.append("π₯ Reading video frames...") |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
frames.append(frame) |
|
|
|
|
|
cap.release() |
|
|
status_log.append(f"β
Loaded {len(frames)} frames") |
|
|
|
|
|
|
|
|
status_log.append("π Generating masks...") |
|
|
if use_auto_detect: |
|
|
status_log.append(f"π Auto-detecting watermarks (threshold: {brightness_threshold})...") |
|
|
for i, frame in enumerate(frames): |
|
|
mask = remover.detect_watermark_auto(frame, threshold=brightness_threshold) |
|
|
masks.append(mask) |
|
|
if i % 50 == 0: |
|
|
status_log.append(f" Detected masks for {i}/{len(frames)} frames") |
|
|
else: |
|
|
masks = [manual_mask.copy() for _ in frames] |
|
|
status_log.append("β
Using manual mask for all frames") |
|
|
|
|
|
|
|
|
if use_temporal_consistency and len(masks) > 1: |
|
|
status_log.append("β±οΈ Applying temporal consistency...") |
|
|
masks = remover.create_temporal_mask(frames, masks) |
|
|
status_log.append("β
Temporal consistency applied") |
|
|
|
|
|
|
|
|
status_log.append(f"π¨ Inpainting frames using {inpaint_method} method...") |
|
|
for i, (frame, mask) in enumerate(zip(frames, masks)): |
|
|
inpainted = remover.inpaint_frame(frame, mask, method=inpaint_method) |
|
|
out.write(inpainted) |
|
|
|
|
|
if i % 30 == 0: |
|
|
status_log.append(f" Processed {i}/{len(frames)} frames") |
|
|
|
|
|
out.release() |
|
|
status_log.append("β
All frames inpainted successfully!") |
|
|
status_log.append(f"πΎ Output saved to: {output_path}") |
|
|
status_log.append("π Processing complete!") |
|
|
|
|
|
return output_path, "\n".join(status_log) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Video Watermark Remover", theme=gr.themes.Base()) as demo: |
|
|
gr.Markdown(""" |
|
|
# π¬ AI Video Watermark Remover |
|
|
|
|
|
Remove watermarks from videos using advanced computer vision techniques! |
|
|
|
|
|
β‘ **Features:** |
|
|
- Auto-detection of watermarks |
|
|
- Manual mask support |
|
|
- Temporal consistency for better results |
|
|
- Multiple inpainting algorithms |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### π€ Input") |
|
|
input_video = gr.Video(label="Upload Video with Watermark") |
|
|
|
|
|
gr.Markdown("### ποΈ Settings") |
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("**Detection Method**") |
|
|
use_auto_detect = gr.Checkbox( |
|
|
label="Auto-detect watermark", |
|
|
value=True, |
|
|
info="Automatically find watermark regions" |
|
|
) |
|
|
|
|
|
brightness_threshold = gr.Slider( |
|
|
minimum=100, |
|
|
maximum=255, |
|
|
value=210, |
|
|
step=5, |
|
|
label="Brightness Threshold", |
|
|
info="Higher values = detect only brighter watermarks" |
|
|
) |
|
|
|
|
|
manual_mask = gr.Image( |
|
|
label="Manual Mask (optional) - White = watermark area, Black = keep", |
|
|
type="filepath" |
|
|
) |
|
|
|
|
|
with gr.Group(): |
|
|
gr.Markdown("**Processing Options**") |
|
|
|
|
|
inpaint_method = gr.Radio( |
|
|
choices=['telea', 'ns'], |
|
|
value='telea', |
|
|
label="Inpainting Method", |
|
|
info="Telea = faster, NS = higher quality" |
|
|
) |
|
|
|
|
|
use_temporal = gr.Checkbox( |
|
|
label="Use temporal consistency", |
|
|
value=True, |
|
|
info="Improves results by analyzing multiple frames" |
|
|
) |
|
|
|
|
|
process_btn = gr.Button("π Remove Watermark", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### β¨ Output") |
|
|
output_video = gr.Video(label="Watermark-Free Video") |
|
|
|
|
|
status_box = gr.Textbox( |
|
|
label="π Processing Log", |
|
|
lines=15, |
|
|
max_lines=20, |
|
|
show_copy_button=True |
|
|
) |
|
|
|
|
|
with gr.Accordion("π‘ Tips & Help", open=False): |
|
|
gr.Markdown(""" |
|
|
### How to Use: |
|
|
|
|
|
1. **Upload Video**: Click to upload or drag & drop your video |
|
|
2. **Choose Method**: |
|
|
- β
Auto-detect: For solid, bright watermarks (logos, timestamps) |
|
|
- π Manual mask: For complex or semi-transparent watermarks |
|
|
3. **Adjust Settings**: |
|
|
- Increase threshold if detecting too much |
|
|
- Decrease threshold if missing watermark |
|
|
4. **Process**: Click the button and wait |
|
|
|
|
|
### Best Practices: |
|
|
|
|
|
- **Solid watermarks**: Use auto-detect with high threshold (200-240) |
|
|
- **Faint watermarks**: Lower threshold (150-180) |
|
|
- **Complex watermarks**: Create a manual mask in any image editor |
|
|
- **Temporal consistency**: Keep ON for videos with static watermarks |
|
|
|
|
|
### Inpainting Methods: |
|
|
|
|
|
- **Telea**: Fast, good for most cases |
|
|
- **Navier-Stokes (NS)**: Slower but better quality for complex textures |
|
|
|
|
|
### Creating Manual Masks: |
|
|
|
|
|
1. Take a screenshot from your video |
|
|
2. Open in any image editor (Paint, Photoshop, GIMP) |
|
|
3. Paint the watermark area **WHITE** |
|
|
4. Keep everything else **BLACK** |
|
|
5. Save as PNG/JPG and upload |
|
|
|
|
|
### Limitations: |
|
|
|
|
|
- Moving watermarks are harder to remove |
|
|
- Very large watermarks may leave artifacts |
|
|
- Processing time increases with video length |
|
|
|
|
|
### Algorithm Details: |
|
|
|
|
|
This app uses: |
|
|
- **Edge detection**: Finds watermark boundaries |
|
|
- **Corner detection**: Identifies logo corners |
|
|
- **Brightness analysis**: Detects bright/dark watermarks |
|
|
- **OpenCV inpainting**: Fills detected regions |
|
|
- **Temporal analysis**: Uses multiple frames for consistency |
|
|
""") |
|
|
|
|
|
with gr.Accordion("π Technical Information", open=False): |
|
|
gr.Markdown(""" |
|
|
### Algorithms Used: |
|
|
|
|
|
1. **Watermark Detection**: |
|
|
- Canny edge detection |
|
|
- Harris corner detection |
|
|
- Brightness thresholding |
|
|
- Morphological operations |
|
|
|
|
|
2. **Inpainting**: |
|
|
- Telea algorithm (Fast Marching Method) |
|
|
- Navier-Stokes based method |
|
|
|
|
|
3. **Temporal Consistency**: |
|
|
- Cross-frame mask refinement |
|
|
- Persistent region detection |
|
|
|
|
|
### For Better Results: |
|
|
|
|
|
For production use, consider deep learning models: |
|
|
- **ProPainter** (ICCV 2023) - State-of-the-art |
|
|
- **E2FGVI** (CVPR 2022) - Fast and efficient |
|
|
- **DiffuEraser** (2025) - Diffusion-based |
|
|
|
|
|
### References: |
|
|
- [ProPainter Paper](https://arxiv.org/abs/2309.03897) |
|
|
- [Video Inpainting Survey](https://arxiv.org/abs/2401.03395) |
|
|
- [OpenCV Inpainting Docs](https://docs.opencv.org/4.x/df/d3d/tutorial_py_inpainting.html) |
|
|
""") |
|
|
|
|
|
|
|
|
process_btn.click( |
|
|
fn=process_video_watermark_removal, |
|
|
inputs=[ |
|
|
input_video, |
|
|
use_auto_detect, |
|
|
manual_mask, |
|
|
brightness_threshold, |
|
|
inpaint_method, |
|
|
use_temporal |
|
|
], |
|
|
outputs=[output_video, status_box] |
|
|
) |
|
|
|
|
|
gr.Markdown(""" |
|
|
--- |
|
|
<div style="text-align: center;"> |
|
|
<p>Made with β€οΈ using Gradio | Enhanced with OpenCV</p> |
|
|
<p>β Star on <a href="https://github.com/sczhou/ProPainter" target="_blank">ProPainter</a> for ML-based watermark removal</p> |
|
|
</div> |
|
|
""") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |