YOND / data_process /yond_datasets.py
hansen97's picture
Initial clean commit
0e07d71
import pickle
import torch
import numpy as np
import cv2
import os
import h5py
import rawpy
from matplotlib import pyplot as plt
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from .unprocess import mosaic, unprocess, random_gains
from .process import *
from utils import *
def bayer_aug(rggb, k=0):
bayer = rggb2bayer(rggb)
bayer = np.rot90(bayer, k=k, axes=(-2,-1))
rggb = bayer2rggb(bayer)
return rggb
# def get_threshold(data, step=1, mode='score2', print=False):
# if mode == 'naive':
# quants = list(range(step, 100, step))
# ths = np.percentile(data.reshape(-1), quants, method='linear')
# ths = np.array(ths.tolist() + [data.max()])
# th = ths[0]
# diffs = ths[1:] - ths[:-1]
# gap = data.max() / (100/step)**2
# for i in range(len(diffs)):
# if diffs[i] > gap:
# if print: log(f'Adaptive percent: {quants[i]}% - th: {ths[i]*959:.2f}')
# th = ths[i]
# break
# elif mode == 'score':
# quants = np.linspace(step, 100, 100//step)
# ths = np.percentile(data.reshape(-1), quants, method='linear')
# diffs = ths[1:] - ths[:-1]
# quants = quants[:-1]
# score = diffs/quants
# i = np.argmin(score)
# th = ths[i]
# if print: log(f'Adaptive percent: {quants[i]}% - th: {ths[i]*959:.2f}')
# return th
# Raw AWGN Dataset(Raw->Raw)
class SID_Raw_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'SID'
self.args['crop_per_image'] = 8
self.args['crop_size'] = 512
self.args['ori'] = False
self.args['iso'] = None
self.args['dgain'] = None
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'SID'
self.args['mode'] = 'train'
self.args['wp'] = 16383
self.args['bl'] = 512
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
self.dataset_file = f'SID_{self.args["mode"]}.info'
with open(f"infos/{self.dataset_file}", 'rb') as info_file:
self.infos = pkl.load(info_file)
print(f'>> Successfully load "{self.dataset_file}" (Length: {len(self.infos)})')
self.datapath = [info['long'] for info in self.infos]
self.names = [info['name'] for info in self.infos]
self.buffer = [None] * len(self.infos)
if 'cache' in self.args['command']:
log(f'Loading {len(self.infos)} crops!!!')
self.buffer = [dataload(path) for path in tqdm(self.datapath)]
self.length = len(self.infos)
self.sigma = -1
self.get_shape()
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def init_random_crop_point(self, mode='non-overlapped', raw_crop=False):
self.h_start = []
self.w_start = []
self.h_end = []
self.w_end = []
self.aug = np.random.randint(8, size=self.args['crop_per_image'])
h, w = self.h, self.w
if raw_crop:
h, w = self.H, self.W
if mode == 'non-overlapped':
nh = h // self.args["patch_size"]
nw = w // self.args["patch_size"]
h_start = np.random.randint(0, h - nh*self.args["patch_size"] + 1)
w_start = np.random.randint(0, w - nw*self.args["patch_size"] + 1)
for i in range(nh):
for j in range(nw):
self.h_start.append(h_start + i * self.args["patch_size"])
self.w_start.append(w_start + j * self.args["patch_size"])
self.h_end.append(h_start + (i+1) * self.args["patch_size"])
self.w_end.append(w_start + (j+1) * self.args["patch_size"])
else: # random_crop
for i in range(self.args['crop_per_image']):
h_start = np.random.randint(0, h - self.args["patch_size"] + 1)
w_start = np.random.randint(0, w - self.args["patch_size"] + 1)
self.h_start.append(h_start)
self.w_start.append(w_start)
self.h_end.append(h_start + self.args["patch_size"])
self.w_end.append(w_start + self.args["patch_size"])
def random_crop(self, img):
# 本函数用于将numpy随机裁剪成以crop_size为边长的方形crop_per_image等份
c, h, w = img.shape
# 创建空numpy做画布, [crops, h, w]
crops = np.empty((self.args["crop_per_image"], c, self.args["patch_size"], self.args["patch_size"]), dtype=np.float32)
# 往空tensor的通道上贴patchs
for i in range(self.args["crop_per_image"]):
crop = img[:, self.h_start[i]:self.h_end[i], self.w_start[i]:self.w_end[i]]
# crop = self.data_aug(crop, mode=self.aug[i]) # 会导致格子纹
crops[i] = crop
return crops
def __getitem__(self, idx):
data = {}
# 读取数据
data['wb'] = self.infos[idx]['wb']
data['ccm'] = self.infos[idx]['ccm']
data['name'] = self.infos[idx]['name']
if self.buffer[idx] is None:
self.buffer[idx] = dataload(self.datapath[idx])
hr_raw = (self.buffer[idx].astype(np.float32) - self.args['bl']) / (self.args['wp'] - self.args['bl'])
# BayerAug, 这个旋转是用来改变bayer模式的
data['pattern'] = np.random.randint(4) if self.args["mode"] == 'train' else idx%4
hr_raw = np.rot90(hr_raw, k=data['pattern'], axes=(-2,-1))
hr_raw = bayer2rggb(hr_raw).clip(0, 1).transpose(2,0,1)
# 模拟VST后的gt数值范围 y=sqrt(x+3/8)≈sqrt(x)
data['vst_aug'] = True if np.random.randint(2) else False
hr_raw = hr_raw ** 0.5 if data['vst_aug'] else hr_raw
if self.args["mode"] == 'train':
# 随机裁剪成crop_per_image份
self.init_random_crop_point(mode=self.args['croptype'], raw_crop=False)
if data['pattern'] % 2:
self.h_start, self.h_end, self.w_start, self.w_end = self.w_start, self.w_end, self.h_start, self.h_end
hr_crops = self.random_crop(hr_raw)
else:
setup_seed(idx)
hr_crops = hr_raw[None,:]
lr_shape = hr_crops.shape
if self.args["lock_wb"] is False and np.random.randint(2):
rgb_gain, red_gain, blue_gain = random_gains()
red_gain = data['wb'][0] / red_gain.numpy()
blue_gain = data['wb'][2] / blue_gain.numpy()
hr_crops *= rgb_gain.numpy()
hr_crops[:,0] = hr_crops[:,0] * red_gain
hr_crops[:,2] = hr_crops[:,2] * blue_gain
data['wb'][0] = red_gain
data['wb'][2] = blue_gain
lr_crops = hr_crops.copy()
# 人工加噪声
if self.args['gpu_preprocess'] is False:
if self.args['mode'] == 'train':
lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max'])
data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255.
else:
data['sigma'] = self.sigma
setup_seed(idx)
noise = np.random.randn(*lr_crops.shape) * data['sigma']
lr_crops += noise
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
if self.args['clip']:
data["lr"] = lr_crops.clip(0, 1)
data["hr"] = hr_crops.clip(0, 1)
return data
# Unprocess Synthetic Dataset(sRGB->Raw)
class RGB_Img2Raw_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'YOND'
self.args['crop_size'] = 256
self.args['ori'] = False
self.args['iso'] = None
self.args['dgain'] = None
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'YOND'
self.args['mode'] = 'train'
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
self.data_dir = f"{self.root_dir}/{self.mode}"
if self.mode == 'train':
self.data_dir += f'_{self.args["subname"]}'
self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}'))
self.names = [os.path.basename(path)[:-4] for path in self.datapath]
self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)]
self.buffer = [None] * len(self.infos)
if 'cache' in self.args['command']:
log(f'Loading {len(self.infos)} crops!!!')
self.buffer = [dataload(path) for path in tqdm(self.datapath)]
self.length = len(self.infos)
self.sigma = -1
self.get_shape()
log(f'Successfully cache {self.length} npy data!!!')
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
if self.buffer[idx] is None:
max_val = 255.
hr_imgs = dataload(self.datapath[idx]).astype(np.float32) / max_val
else:
max_val = 65535. if self.buffer[idx].dtype == np.uint16 else 255.
hr_imgs = self.buffer[idx].astype(np.float32) / max_val
if self.args["mode"] == 'train':
data['aug_id1'] = np.random.randint(8)
self.data_aug(hr_imgs, data['aug_id1'])
else:
setup_seed(idx)
hr_crops = hr_imgs
# RAW需要复杂的unproces
lr_shape = hr_crops.shape
hr_crops = torch.from_numpy(hr_crops)
hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess'])
data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()])
data['ccm'] = metadata['cam2rgb'].numpy()
if 'blur' in self.args['command']:
lr_crops = hr_crops.clone().numpy()
lr_crops = cv2.GaussianBlur(lr_crops, (3,3), -1)
red = lr_crops[0::2, 0::2, 0]
green_red = lr_crops[0::2, 1::2, 1]
green_blue = lr_crops[1::2, 0::2, 1]
blue = lr_crops[1::2, 1::2, 2]
lr_crops = np.stack((red, green_red, green_blue, blue), axis=0)
hr_crops = mosaic(hr_crops).numpy() # rggb
# 这个旋转是用来改变bayer模式的
if 'no_bayeraug' in self.args["command"]:
data['pattern'] = 0
else:
if self.args["mode"] == 'train':
data['pattern'] = np.random.randint(4)
else:
data['pattern'] = idx%4
hr_crops = bayer_aug(hr_crops, k=data['pattern'])
data['vst_aug'] = False
hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops
# [crops,h,w,c] -> [crops,c,h,w]
hr_crops = hr_crops.transpose(2,0,1)
if 'blur' not in self.args['command']:
lr_crops = hr_crops.copy()
# 人工加噪声
if self.args['gpu_preprocess'] is False:
if self.args['mode'] == 'train':
lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max'])
data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255.
else:
data['sigma'] = self.sigma
setup_seed(idx)
noise = np.random.randn(*lr_crops.shape) * data['sigma']
lr_crops += noise
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
if self.args['clip']:
data["lr"] = lr_crops.clip(0, 1)
data["hr"] = hr_crops.clip(0, 1)
return data
# Unprocess Synthetic Dataset(sRGB->Raw)
class RGB_Img2Raw_Syn_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'YOND'
self.args['crop_size'] = 256
self.args['ori'] = False
self.args['iso'] = None
self.args['dgain'] = None
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'YOND'
self.args['mode'] = 'train'
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
self.data_dir = f"{self.root_dir}/{self.mode}"
if self.mode == 'train':
self.data_dir += f'_{self.args["subname"]}'
self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}'))
self.names = [os.path.basename(path)[:-4] for path in self.datapath]
self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)]
self.buffer = [None] * len(self.infos)
if 'cache' in self.args['command']:
log(f'Loading {len(self.infos)} crops!!!')
self.buffer = [dataload(path) for path in tqdm(self.datapath)]
self.length = len(self.infos)
self.sigma = -1
self.get_shape()
log(f'Successfully cache {self.length} npy data!!!')
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
if self.buffer[idx] is None:
self.buffer[idx] = dataload(self.datapath[idx])
max_val = 65535. if self.buffer[idx].dtype == np.uint16 else 255.
hr_imgs = self.buffer[idx].astype(np.float32) / max_val
if self.args["mode"] == 'train':
data['aug_id1'] = np.random.randint(8)
self.data_aug(hr_imgs, data['aug_id1'])
else:
setup_seed(idx)
hr_crops = hr_imgs
# RAW需要复杂的unproces
lr_shape = hr_crops.shape
hr_crops = torch.from_numpy(hr_crops)
hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess'])
data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()])
data['ccm'] = metadata['cam2rgb'].numpy()
hr_crops = mosaic(hr_crops).numpy() # rgbg
# 这个旋转是用来改变bayer模式的
if 'no_bayeraug' in self.args["command"]:
data['pattern'] = 0
else:
if self.args["mode"] == 'train':
data['pattern'] = np.random.randint(4)
else:
data['pattern'] = idx%4
hr_crops = bayer_aug(hr_crops, k=data['pattern'])
data['vst_aug'] = False
hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops
# [crops,h,w,c] -> [crops,c,h,w]
hr_crops = hr_crops.transpose(2,0,1)
lr_crops = hr_crops.copy()
# 人工加噪声
if self.args['gpu_preprocess'] is False:
if self.args['mode'] == 'train':
p = sample_params(self.args['camera_type'])
p['ratio'] = np.random.rand(1) * 19 + 1
lr_crops = generate_noisy_obs(lr_crops, param=p, noise_code=self.args['noise_code'], ori=self.args['ori'])
if self.args['ori']:
hr_crops = hr_crops / p['ratio']
else:
p['K'] = p['K'] * p['ratio']
p['sigGs'] = p['sigGs'] * p['ratio']
hr_crops = hr_crops * (p['wp'] - p['bl'])
lr_crops = lr_crops * (p['wp'] - p['bl'])
bias = close_form_bias(hr_crops, p['sigGs'], p['K'])
hr_crops = VST(hr_crops, p['sigGs'], gain=p['K'])
lr_crops = VST(lr_crops, p['sigGs'], gain=p['K'])
lr_crops = lr_crops - bias
lower = VST(0, p['sigGs'], gain=p['K'])
upper = VST((p['wp'] - p['bl']), p['sigGs'], gain=p['K'])
nsr = 1 / (upper - lower)
hr_crops = (hr_crops - lower) / (upper - lower)
lr_crops = (lr_crops - lower) / (upper - lower)
data['sigma'] = nsr
else:
data['sigma'] = self.sigma
setup_seed(idx)
noise = np.random.randn(*lr_crops.shape) * data['sigma']
lr_crops += noise
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
if self.args['clip']:
data["lr"] = lr_crops.clip(0, 1)
data["hr"] = hr_crops.clip(0, 1)
return data
# Unprocess Synthetic Dataset(sRGB->Raw)
class RGB_Img2Raw_SFRN_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'YOND'
self.args['crop_size'] = 256
self.args['ori'] = False
self.args['iso'] = None
self.args['dgain'] = None
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'YOND'
self.args['mode'] = 'train'
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
self.data_dir = f"{self.root_dir}/{self.mode}"
if self.mode == 'train':
self.data_dir += f'_{self.args["subname"]}'
self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}'))
self.names = [os.path.basename(path)[:-4] for path in self.datapath]
self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)]
self.buffer = [None] * len(self.infos)
if 'cache' in self.args['command']:
log(f'Loading {len(self.infos)} crops!!!')
self.buffer = [dataload(path) for path in tqdm(self.datapath)]
self.legal_iso = sorted([int(iso) for iso in os.listdir(f"{self.args['bias_dir']}")])
self.bias_frames = {iso: sorted(glob.glob(f"{self.args['bias_dir']}/{iso}/*.npy")) for iso in self.legal_iso}
# self.buffer_lr = [bayer2rggb(dataload(path)).transpose(2,0,1) for path in tqdm(self.bias_frames)]
self.buffer_lr = {}
for iso in self.legal_iso:
log(f'Loading {len(self.bias_frames[iso])} bias frames (ISO-{iso})!!!')
self.buffer_lr[iso] = [bayer2rggb(dataload(path)).transpose(2,0,1) for path in tqdm(self.bias_frames[iso])]
self.bias_lut = BiasLUT()
self.length = len(self.infos)
self.sigma = -1
self.get_shape()
log(f'Successfully cache {self.length} npy data!!!')
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
hr_imgs = dataload(self.datapath[idx])
max_val = 65535. if hr_imgs.dtype == np.uint16 else 255.
hr_imgs = hr_imgs.astype(np.float32) / max_val
if self.args["mode"] == 'train':
data['aug_id1'] = np.random.randint(8)
self.data_aug(hr_imgs, data['aug_id1'])
else:
setup_seed(idx)
hr_crops = hr_imgs
# RAW需要复杂的unproces
lr_shape = hr_crops.shape
hr_crops = torch.from_numpy(hr_crops)
hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess'])
data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()])
data['ccm'] = metadata['cam2rgb'].numpy()
if 'blur' in self.args['command']:
lr_crops = hr_crops.clone().numpy()
lr_crops = cv2.GaussianBlur(lr_crops, (3,3), 0.4)
red = lr_crops[0::2, 0::2, 0]
green_red = lr_crops[0::2, 1::2, 1]
green_blue = lr_crops[1::2, 0::2, 1]
blue = lr_crops[1::2, 1::2, 2]
lr_crops = np.stack((red, green_red, green_blue, blue), axis=0)
hr_crops = mosaic(hr_crops).numpy() # rggb
# 这个旋转是用来改变bayer模式的
if 'no_bayeraug' in self.args["command"]:
data['pattern'] = 0
else:
if self.args["mode"] == 'train':
data['pattern'] = np.random.randint(4)
else:
data['pattern'] = idx%4
hr_crops = bayer_aug(hr_crops, k=data['pattern'])
data['vst_aug'] = False
hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops
# [crops,h,w,c] -> [crops,c,h,w]
hr_crops = hr_crops.transpose(2,0,1)
if 'blur' not in self.args['command']:
lr_crops = hr_crops.copy()
data['ISO'] = iso = self.legal_iso[np.random.randint(len(self.legal_iso))]
idr = np.random.randint(len(self.buffer_lr[iso]))
dh, dw = self.args['patch_size']//2, self.args['patch_size']//2
xx = np.random.randint(self.h - dh + 1)
yy = np.random.randint(self.w - dw + 1)
black_crops = self.buffer_lr[iso][idr][:, xx:xx+dh, yy:yy+dw].copy()
# 人工加噪声
data['ratio'] = np.ones(1)
if self.args['gpu_preprocess'] is False:
if self.args['mode'] == 'train':
p = sample_params_max(self.args['camera_type'], iso=iso)
p['ratio'] = np.exp(np.random.rand() * 4)
data['ratio'] = p['ratio']
y = lr_crops * (p['wp'] - p['bl']) / p['ratio']
y = np.random.poisson(y/p['K']).astype(np.float32) * p['K']
n_read = black_crops
n_read += np.random.randn(y.shape[-3], y.shape[-2], 1).astype(np.float32) * p['sigR']
n_read += np.random.uniform(low=-0.5, high=0.5, size=y.shape)
n_read += np.random.randn() * 0.03 # BLE
y += n_read
lr_crops = y / (p['wp'] - p['bl'])
p['sigGs'] = n_read.std()
if self.args['ori']:
hr_crops = hr_crops / p['ratio']
else:
lr_crops = lr_crops * p['ratio']
p['K'] = p['K'] * p['ratio']
p['sigGs'] = p['sigGs'] * p['ratio']
hr_crops = hr_crops * (p['wp'] - p['bl'])
lr_crops = lr_crops * (p['wp'] - p['bl'])
bias = self.bias_lut.get_lut(lr_crops.clip(0,None), p['sigGs'], p['K'])
# bias = close_form_bias(lr_crops.clip(0,None), p['sigGs'], p['K'])
hr_crops = VST(hr_crops, p['sigGs'], gain=p['K'])
lr_crops = VST(lr_crops, p['sigGs'], gain=p['K'])
lr_crops = lr_crops - bias
lower = VST(0, p['sigGs'], gain=p['K'])
upper = VST((p['wp'] - p['bl']), p['sigGs'], gain=p['K'])
nsr = 1 / (upper - lower)
hr_crops = (hr_crops - lower) / (upper - lower)
lr_crops = (lr_crops - lower) / (upper - lower)
data['sigma'] = nsr
else:
data['sigma'] = self.sigma
setup_seed(idx)
noise = np.random.randn(*lr_crops.shape) * data['sigma']
lr_crops += noise
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
if self.args['clip']:
data["lr"] = lr_crops.clip(0, 1)
data["hr"] = hr_crops.clip(0, 1)
return data
# Synthetic Dataset(sRGB)
class RGB_Img_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'YOND'
self.args['crop_size'] = 256
self.args['ori'] = False
self.args['iso'] = None
self.args['dgain'] = None
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'YOND'
self.args['mode'] = 'train'
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
self.data_dir = f"{self.root_dir}/{self.mode}"
if self.mode == 'train':
self.data_dir += f'_{self.args["subname"]}'
self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}'))
self.names = [os.path.basename(path)[:-4] for path in self.datapath]
self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)]
self.buffer = [None] * len(self.infos)
if 'cache' in self.args['command']:
log(f'Loading {len(self.infos)} crops!!!')
self.buffer = [dataload(path) for path in tqdm(self.datapath)]
self.length = len(self.infos)
self.sigma = -1
self.get_shape()
log(f'Successfully cache {self.length} npy data!!!')
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
if self.buffer[idx] is None:
self.buffer[idx] = dataload(self.datapath[idx])
max_val = 255. if self.buffer[idx].dtype == np.uint8 else 65535.
hr_imgs = self.buffer[idx].astype(np.float32) / max_val
if self.args["mode"] == 'train':
data['aug_id1'] = np.random.randint(8)
self.data_aug(hr_imgs, data['aug_id1'])
else:
setup_seed(idx)
hr_crops = hr_imgs.transpose(2,0,1)
lr_crops = hr_crops.copy()
# 人工加噪声
if self.args['gpu_preprocess'] is False:
if self.args['mode'] == 'train':
# lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max'])
# data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255.
lower, upper = self.args['sigma_min'], self.args['sigma_max']
data['sigma'] = (np.random.rand()*(upper-lower)+lower) / 255.
else:
data['sigma'] = self.sigma
setup_seed(idx)
noise = np.random.randn(*lr_crops.shape) * data['sigma']
lr_crops += noise
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
if self.args['clip']:
data["lr"] = lr_crops.clip(0, 1)
data["hr"] = hr_crops.clip(0, 1)
return data
# Unprocess Synthetic Dataset(sRGB->Raw)
class DIV2K_Img2Raw_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'DIV2K'
self.args['crop_size'] = 256
self.args['ori'] = False
self.args['iso'] = None
self.args['dgain'] = None
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'DIV2K'
self.args['mode'] = 'train'
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
self.data_dir = f"{self.root_dir}/npy/{self.mode}"
self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}'))
self.names = [os.path.basename(path)[:-4] for path in self.datapath]
self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)]
self.buffer = [None] * len(self.infos)
if 'cache' in self.args['command']:
log(f'Loading {len(self.infos)} crops!!!')
self.buffer = [dataload(path) for path in tqdm(self.datapath)]
self.length = len(self.infos)
self.sigma = -1
self.get_shape()
log(f'Successfully cache {self.length} npy data!!!')
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
if self.buffer[idx] is None:
self.buffer[idx] = dataload(self.datapath[idx])
hr_imgs = self.buffer[idx].astype(np.float32) / 255.
if self.args["mode"] == 'train':
data['aug_id1'] = np.random.randint(8)
self.data_aug(hr_imgs, data['aug_id1'])
else:
setup_seed(idx)
hr_crops = hr_imgs
# RAW需要复杂的unproces
lr_shape = hr_crops.shape
hr_crops = torch.from_numpy(hr_crops)
hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess'])
data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()])
data['ccm'] = metadata['cam2rgb'].numpy()
hr_crops = mosaic(hr_crops).numpy() # rgbg
# 这个旋转是用来改变bayer模式的
# if self.args["mode"] == 'train':
data['pattern'] = np.random.randint(4) if self.args["mode"] == 'train' else idx%4
hr_crops = bayer_aug(hr_crops, k=data['pattern'])
data['vst_aug'] = False
hr_crops = hr_crops ** 0.5 if data['vst_aug'] else hr_crops
# [crops,h,w,c] -> [crops,c,h,w]
hr_crops = hr_crops.transpose(2,0,1)
lr_crops = hr_crops.copy()
# 人工加噪声
if self.args['gpu_preprocess'] is False:
if self.args['mode'] == 'train':
lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max'])
data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255.
else:
data['sigma'] = self.sigma
setup_seed(idx)
noise = np.random.randn(*lr_crops.shape) * data['sigma']
lr_crops += noise
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
if self.args['clip']:
data["lr"] = lr_crops.clip(0, 1)
data["hr"] = hr_crops.clip(0, 1)
return data
# Unprocess Synthetic Dataset(sRGB->Raw3c1n)
class RGB_Img2Raw3c1n_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'YOND'
self.args['crop_size'] = 256
self.args['ori'] = False
self.args['iso'] = None
self.args['dgain'] = None
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'YOND'
self.args['mode'] = 'train'
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
self.data_dir = f"{self.root_dir}/{self.mode}"
if self.mode == 'train':
self.data_dir += f'_{self.args["subname"]}'
self.datapath = sorted(glob.glob(f'{self.data_dir}/*.{self.suffix}'))
self.names = [os.path.basename(path)[:-4] for path in self.datapath]
self.infos = [{'name':name, 'path':path} for name, path in zip(self.names, self.datapath)]
self.buffer = [None] * len(self.infos)
if 'cache' in self.args['command']:
log(f'Loading {len(self.infos)} crops!!!')
self.buffer = [dataload(path) for path in tqdm(self.datapath)]
self.length = len(self.infos)
self.sigma = -1
self.get_shape()
log(f'Successfully cache {self.length} npy data!!!')
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
if self.buffer[idx] is None:
self.buffer[idx] = dataload(self.datapath[idx])
max_val = 255. if self.buffer[idx].dtype == np.uint8 else 65535.
hr_imgs = self.buffer[idx].astype(np.float32) / max_val
if self.args["mode"] == 'train':
data['aug_id1'] = np.random.randint(8)
self.data_aug(hr_imgs, data['aug_id1'])
else:
setup_seed(idx)
hr_crops = hr_imgs
# RAW需要复杂的unproces
lr_shape = hr_crops.shape
hr_crops = torch.from_numpy(hr_crops)
hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=self.args['gpu_preprocess'])
data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()])
data['ccm'] = metadata['cam2rgb'].numpy()
hr_crops = mosaic(hr_crops).numpy() # rggb
# [crops,h,w,c] -> [crops,c,h,w]
hr_crops = hr_crops.transpose(2,0,1)
lr_crops = hr_crops.copy()
# 人工加噪声
if self.args['gpu_preprocess'] is False:
if self.args['mode'] == 'train':
lower, upper = np.log(self.args['sigma_min']), np.log(self.args['sigma_max'])
data['sigma'] = np.exp(np.random.rand()*(upper-lower)+lower) / 255.
else:
data['sigma'] = self.sigma
setup_seed(idx)
noise = np.random.randn(*lr_crops.shape[-2:]) * data['sigma']
lr_crops[2] += noise
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
if self.args['clip']:
data["lr"] = lr_crops.clip(0, 1)
data["hr"] = hr_crops.clip(0, 1)
return data
# Unprocess Synthetic Dataset(sRGB->Raw)
class DIV2K_PG_Dataset(DIV2K_Img2Raw_Dataset):
def __init__(self, args=None):
super().__init__(args)
self.noise_params= {
'Kmin': -2.5, 'Kmax': 3.5, 'lam': 0.102, 'q': 1/(2**10), 'wp': 1023, 'bl': 64,
'sigTLk': 0.85187, 'sigTLb': 0.07991, 'sigTLsig': 0.02921,
'sigRk': 0.87611, 'sigRb': -2.11455, 'sigRsig': 0.03274,
'sigGsk': 0.85187, 'sigGsb': 0.67991, 'sigGssig': 0.02921,
}
self.p = self.get_noise_params()
def get_noise_params(self):
p = self.noise_params
log_K = np.random.uniform(low=p['Kmin'], high=p['Kmax'])
mu_Gs = (p['sigGsk'] + np.random.uniform(-0.2, 0.2))*log_K + (p['sigGsb'] + np.random.uniform(-1, 1))
log_sigGs = np.random.normal(loc=mu_Gs, scale=p['sigGssig'])
K = np.exp(log_K)
sigma = np.exp(log_sigGs)
scale = p['wp'] - p['bl']
self.p = {'K':K, 'sigma':sigma, 'beta1':K/scale, 'beta2':(sigma/scale)**2,
'wp':p['wp'], 'bl':p['bl'], 'scale':p['wp']-p['bl']}
return self.p
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
if self.buffer[idx] is None:
self.buffer[idx] = dataload(self.datapath[idx])
hr_imgs = self.buffer[idx].astype(np.float32) / 255.
if self.args["mode"] == 'train':
data['aug_id1'] = np.random.randint(8)
# self.data_aug(hr_imgs, data['aug_id1'])
else:
setup_seed(idx)
hr_crops = hr_imgs
# RAW需要复杂的unproces
lr_shape = hr_crops.shape
hr_crops = torch.from_numpy(hr_crops)
hr_crops, metadata = unprocess(hr_crops, lock_wb=self.args["lock_wb"], use_gpu=False)
data['wb'] = np.array([metadata['red_gain'].item(), 1., metadata['blue_gain'].item()])
data['ccm'] = metadata['cam2rgb'].numpy()
hr_crops = mosaic(hr_crops).numpy() # rgbg
# 这个旋转是用来改变bayer模式的
# if self.args["mode"] == 'train':
data['pattern'] = np.random.randint(4) if self.args["mode"] == 'train' else idx%4
hr_crops = bayer_aug(hr_crops, k=data['pattern'])
lr_crops = hr_crops.copy()
# 人工加噪声
if self.args['mode'] == 'train':
p = self.get_noise_params()
data.update(p)
else:
data.update(self.p)
if self.args['gpu_preprocess'] is False:
lr_crops = np.random.poisson(lr_crops/p['beta1'])*p['beta1'] + np.random.randn(*lr_crops.shape) * data['beta2']**0.5
if 'est' in self.args['command']:
# 对噪图求blur
k = 19
lr_crops = lr_crops
hr_crops = hr_crops
lr_blur = cv2.blur(lr_crops, (k, k))
hr_blur = cv2.blur(hr_crops, (k, k))
lr_std = stdfilt(lr_crops, k)
hr_std = stdfilt(hr_crops, k)
hr_target = (p['beta1'] * hr_blur + p['beta2']) ** 0.5
var = lr_std**2
mean = lr_blur
th, percent = get_threshold(hr_std)
mask = (hr_std <= th)
# 按阈值分割图像平滑区域与非平滑区域
if var[mask].size > 0:
var, mean = var[mask], mean[mask]
else:
mask = (hr_std <= hr_std.max())
data['th'] = th
data['hr_mask'] = mask
data['lr_rggb'] = lr_crops
data['hr_rggb'] = hr_crops
data['lr_std'] = lr_std
data['hr_std'] = hr_std
data['lr_blur'] = lr_blur
data['hr_blur'] = hr_blur
data['lr'] = np.concatenate([lr_std, lr_blur, lr_crops], axis=-1)
data['hr'] = hr_target
else:
data["lr"] = np.ascontiguousarray(lr_crops)
data["hr"] = np.ascontiguousarray(hr_crops)
for key in data:
if 'lr' in key or 'hr' in key:
data[key] = data[key].transpose(2,0,1)
if self.args['clip']:
data['lr'] = lr_crops.clip(0, 1)
data['hr'] = hr_crops.clip(0, 1)
return data
# SIDD Paired Real Data
class SIDD_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = '/data/fenghansen/datasets/SIDD'
self.args['params'] = None
self.args['lock_wb'] = False
self.args['gpu_preprocess'] = False
self.args['dstname'] = 'SIDD'
self.args['mode'] = 'eval'
self.args['clip'] = 'True'
self.args['wp'] = 1023
self.args['bl'] = 64
self.args['patch_size'] = 256
self.args['H'] = 256
self.args['W'] = 256
self.args['command'] = ''
def initialization(self):
# 获取数据地址
self.suffix = 'npy'
self.root_dir = self.args['root_dir']
self.mode = self.args['mode']
if self.mode == 'train':
self.data_dir = f'{self.root_dir}/SIDD_Medium_Raw/data'
else:
self.data_dir = f'{self.root_dir}/SIDD_Benchmark_Data'
if self.mode == 'eval':
self.lr_data = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/ValidationNoisyBlocksRaw.mat')['ValidationNoisyBlocksRaw']
self.hr_data = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/ValidationGtBlocksRaw.mat')['ValidationGtBlocksRaw']
else:
self.lr_data = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/BenchmarkNoisyBlocksRaw.mat')['BenchmarkNoisyBlocksRaw']
self.hr_data = None
self.pos = sio.loadmat(f'{self.root_dir}/SIDD_Validation_Raw/BenchmarkBlocks32.mat')['BenchmarkBlocks32']
self.names = sorted(os.listdir(self.data_dir))
self.datapaths = sorted(glob.glob(f'{self.data_dir}//*/*_010.MAT'))
self.metapaths = sorted([path for path in self.datapaths if 'META' in path])
self.lr_paths = sorted([path for path in self.datapaths if 'NOISY' in path])
self.hr_paths = sorted([path for path in self.datapaths if 'GT' in path])
self.length = len(self.names)
self.infos = [None] * self.length
for i in range(self.length):
metadata = read_metadata(dataload(self.metapaths[i]))
self.infos[i] = {
'name': self.names[i],
'lr_path': self.lr_paths[i],
'hr_path': self.hr_paths[i] if len(self.hr_paths)>0 else None,
'metadata': metadata,
}
self.sigma = -1
self.get_shape()
log(f'Successfully load {self.length} data!!! ({self.mode})')
def __len__(self):
return self.length
def get_shape(self):
self.H, self.W = self.args['H'], self.args['W']
self.C = 3
self.h = self.H // 2
self.w = self.W // 2
self.c = 4
def data_aug(self, data, mode=0):
if mode == 0: return data
rot = mode % 4
flip = mode // 4
data = np.rot90(data, k=rot, axes=(-2, -1))
if flip:
data = data[..., ::-1]
return data
def __getitem__(self, idx):
data = {}
# 读取数据
data['name'] = self.infos[idx]['name']
data['meta'] = self.infos[idx]['metadata']
data['lr_path_full'] = self.infos[idx]['lr_path']
data['hr_path_full'] = self.infos[idx]['hr_path']
data['wb'] = data['meta']['wb']
data['cfa'] = data['meta']['bayer_2by2']
data['ccm'] = data['meta']['cst2']
data['iso'] = data['meta']['iso']
data['reg'] = (data['meta']['beta1'], data['meta']['beta2'])
if self.args["mode"] == 'train':
raise NotImplementedError
else:
data['lr'] = self.lr_data[idx]
if self.mode == 'eval':
data['hr'] = self.hr_data[idx]
return data
class LRID_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'LRID/'
self.args['suffix'] = 'dng'
self.args['dgain'] = 1
self.args['dstname'] = 'indoor_x5'
self.args['camera_type'] = 'IMX686'
self.args['params'] = None
self.args['mode'] = 'eval'
self.args['GT_type'] = 'GT_align_ours'
self.args['command'] = ''
self.args['H'] = 3472
self.args['W'] = 4624
self.args['wp'] = 1023
self.args['bl'] = 64
self.args['clip'] = False
def initialization(self):
# 获取数据地址
self.suffix = 'dng'
self.change_eval_ratio(ratio=1)
self.iso = 6400
def __len__(self):
return self.length
def get_shape(self):
self.shape = self.templet_raw.raw_image_visible.shape
self.H, self.W = self.shape
if 'bl' not in self.args:
self.bl_all = np.array(self.templet_raw.black_level_per_channel)
if np.mean(self.bl_all - self.bl_all[0]) != 0:
warnings.warn(f'4 channel have different black level!!! ({self.bl_all})')
self.bl = self.bl_all[0]
else:
self.bl = self.args['bl']
self.wp = self.templet_raw.white_level
def change_eval_ratio(self, ratio):
self.ratio = ratio
self.infos_gt = []
self.infos_short = []
for dstname in self.args['dstname']:
with open(f"infos/{dstname}_{self.args['GT_type']}.info", 'rb') as info_file:
info = pkl.load(info_file)
eval_id = self.get_eval_id(dstname)
for idx in eval_id:
self.infos_gt.append(info[idx])
with open(f'infos/{dstname}_short.info', 'rb') as info_file:
info = pkl.load(info_file)[ratio]
eval_id = self.get_eval_id(dstname)
for idx in eval_id:
self.infos_short.append(info[idx])
self.infos = self.infos_gt
for i in range(len(self.infos)):
self.infos[i]['hr'] = self.infos[i]['data']
self.infos[i]['lr'] = self.infos_short[i]['data'][0]
if 'syn_noise' in self.args['command'].lower():
nt = self.args['noise_type'].upper()
self.infos[i]['lr'] = self.infos[i]['hr'].replace('GT_align_ours', f'Noisy_{nt}/{ratio}')
self.infos[i]['ExposureTime'] = self.infos_short[i]['metadata'][0]['ExposureTime']
del self.infos[i]['data']
print(f'>> Successfully load infos.pkl (Length: {len(self.infos)})')
self.iso = 6400
self.length = len(self.infos)
self.templet_raw_path = self.infos_short[0]['data'][0]
self.templet_raw = rawpy.imread(self.templet_raw_path)
self.get_shape()
def get_eval_id(self, dstname='indoor_x5'):
if dstname == 'indoor_x5':
eval_ids = [4,14,25,41,44,51,52,53,58]
elif dstname == 'indoor_x3':
eval_ids = []#[0,6,15]
elif dstname == 'outdoor_x5':
eval_ids = [1,2,5]
elif dstname == 'outdoor_x3':
eval_ids = [9,21,22,32,44,51]
else:
eval_ids = []
return eval_ids
def __getitem__(self, idx):
data = {}
# dataload
hr_raw = np.array(dataload(self.infos[idx]['hr'])).reshape(self.H,self.W)
lr_raw = np.array(dataload(self.infos[idx]['lr'])).reshape(self.H,self.W)
data["hr"] = (hr_raw.astype(np.float32) - self.bl) / (self.wp - self.bl)
# lr_raw = bayer2rggb(lr_raw.astype(np.float32)) - self.bl_all.reshape(1,1,-1)
data["lr"] = (lr_raw.astype(np.float32) - self.bl) * self.ratio / (self.wp - self.bl)
data['name'] = f"{self.infos[idx]['name']}_x{self.ratio:02d}"
data['ratio'] = self.ratio
data['ccm'] = self.infos[idx]['ccm']
data['wb'] = self.infos[idx]['wb']
data['cfa'] = 'rggb'
data['ISO'] = self.iso
data['ExposureTime'] = self.infos[idx]['ExposureTime'] * 1000
if self.args['clip']:
data["hr"] = data["hr"].clip(0,1)
data["lr"] = data["lr"].clip(0,1)
return data
class ELD_Full_Dataset(Dataset):
def __init__(self, args=None):
super().__init__()
self.default_args()
if args is not None:
for key in args:
self.args[key] = args[key]
self.initialization()
def default_args(self):
self.args = {}
self.args['root_dir'] = 'ELD/'
self.args['ratio'] = 1
self.args['dstname'] = 'ELD'
self.args['params'] = None
self.args['mode'] = 'eval'
self.args['command'] = ''
self.args['wp'] = 16383
self.args['bl'] = 512
self.args['clip'] = False
def initialization(self):
# 获取数据地址
self.suffix = {'CanonEOS70D':'CR2', 'CanonEOS700D':'CR2', 'NikonD850':'nef', 'SonyA7S2':'ARW'}
self.infos_all = {'CanonEOS70D':[], 'CanonEOS700D':[], 'NikonD850':[], 'SonyA7S2':[]}
iso_list = [800, 1600, 3200]
ratio_list = [1,10,100,200]
hr_ids = np.array([1, 6, 11, 16])
for camera_type in self.infos_all:
sub_dir = f'{self.args["root_dir"]}/{camera_type}'
for scene in range(1,11):
for iso_id, iso in enumerate(iso_list):
for ratio_id, ratio in enumerate(ratio_list):
lr_id = iso_id*5 + ratio_id + 2
ind = np.argmin(np.abs(lr_id - hr_ids))
hr_id = hr_ids[ind]
name = f'IMG_{lr_id:04d}.{self.suffix[camera_type]}'
hr_name = f'IMG_{hr_id:04d}.{self.suffix[camera_type]}'
self.infos_all[camera_type].append({
'cam': camera_type,
'name': f'{camera_type}_{scene:02d}_{name[:-4]}',
'hr': f'{sub_dir}/scene-{scene}/{hr_name}',
'lr': f'{sub_dir}/scene-{scene}/{name}',
'iso': iso,
'ratio': ratio,
})
self.change_eval_ratio('SonyA7S2', ratio=1)
def __len__(self):
return self.length
def change_eval_ratio(self, cam='SonyA7S2', ratio=1, iso_list=None):
if iso_list is None:
iso_list = [800, 1600, 3200]
self.infos = []
for i in range(len(self.infos_all[cam])):
if self.infos_all[cam][i]['iso'] in iso_list and self.infos_all[cam][i]['ratio'] == ratio:
self.infos.append(self.infos_all[cam][i])
self.length = len(self.infos)
self.ratio = ratio
self.templet_raw_path = self.infos[0]['lr']
self.templet_raw = rawpy.imread(self.templet_raw_path)
self.get_shape()
log(f'Eval change to {cam} (length:{self.length}): ratio={ratio}, iso_list={iso_list}')
def get_shape(self):
self.shape = self.templet_raw.raw_image_visible.shape
self.H, self.W = self.shape
self.bl = np.array(self.templet_raw.black_level_per_channel)
if np.mean(self.bl - self.bl[0]) != 0:
warnings.warn(f'4 channel have different black level!!! ({self.bl})')
self.bl = self.bl[0]
self.wp = self.templet_raw.white_level
def __getitem__(self, idx):
data = {}
# dataload
hr_raw = np.array(dataload(self.infos[idx]['hr'])).reshape(self.H,self.W)
lr_raw = np.array(dataload(self.infos[idx]['lr'])).reshape(self.H,self.W)
data["hr"] = (hr_raw.astype(np.float32) - self.bl) / (self.wp - self.bl)
data["lr"] = (lr_raw.astype(np.float32) - self.bl) * self.infos[idx]['ratio'] / (self.wp - self.bl)
data['name'] = self.infos[idx]['name']
data['wb'], data['ccm'] = read_wb_ccm(rawpy.imread(self.infos[idx]['hr']))
data['ratio'] = self.infos[idx]['ratio']
data['ISO'] = self.infos[idx]['iso']
if self.args['clip']:
data["hr"] = data["hr"].clip(0,1)
data["lr"] = data["lr"].clip(0,1)
return data
'''
ds = np.load('E:/datasets/LRID/resources/darkshading-iso-6400.npy')
lr_paths = glob.glob()
raw_hr = rawpy.imread('F:/datasets/SELD/indoor_x5/100/004/000071_exp-512000000_iso-100_2022_08_17_06_21_43_034_12562490542059_orientation_0_camera-0.dng')
raw_lr = rawpy.imread('E:/datasets/LRID/indoor_x5/6400/1/004/000115_exp-8000000_iso-6400_2022_08_17_06_21_58_288_12578251173465_orientation_0_camera-0.dng')#.raw_image_visible
p = get_ISO_ExposureTime('F:/datasets/SELD/indoor_x5/100/004/000071_exp-512000000_iso-100_2022_08_17_06_21_43_034_12562490542059_orientation_0_camera-0.dng')
p['name'] = 'IMX686'
bl = raw_lr.black_level_per_channel[0]
print(raw_lr.black_level_per_channel)
p['bl'] = raw_lr.black_level_per_channel[0]
p['wp'] = 1023
p['ratio'] = 1
p['scale'] = (p['wp']-p['bl']) / p['ratio']
print(p)
lr_raw = (raw_lr.raw_image_visible.astype(np.float32) - p['bl']) / (p['wp'] - p['bl'])
hr_raw = (raw_hr.raw_image_visible.astype(np.float32) - p['bl']) / (p['wp'] - p['bl'])
hr_raw = np.load('E:/datasets/LRID/indoor_x5/npy/GT_align_ours/004.npy')
hr_raw = (hr_raw.astype(np.float32) - p['bl']) / (p['wp'] - p['bl'])
print(hr_raw.shape, hr_raw.min(), hr_raw.max())
print(lr_raw.shape, lr_raw.min(), lr_raw.max())
raw_hr.raw_image_visible[:] = hr_raw * (p['wp'] - p['bl']) + p['bl']
img_hr = raw_hr.postprocess(use_camera_wb=True, half_size=False, no_auto_bright=True, output_bps=8)
plt.imsave(f"{p['name']}.png", img_hr)
data = {'lr': lr_raw, 'hr':hr_raw.clip(0,1), 'name':p['name']}
'''