import json import jsonlines from multiprocessing import Pool from tqdm import tqdm def load_json(path): with open(path,"r") as f: return json.load(f) def dump_json(x, path): with open(path,"w") as f: json.dump(x,f,indent=4) def apply_func(x, func): pool = Pool(8) for inval,outval in tqdm(pool.imap_unordered(func,x),total=len(x)): yield inval,outval def load_jsonl(path,max_items=None): with jsonlines.open(path) as reader: for ii,item in enumerate(reader): yield item if max_items is not None and ii >= max_items: break def dump_jsonl(data, path): assert isinstance(data, list) with jsonlines.open(path, mode="w") as wr: for item in tqdm(data,leave=False): wr.write(item) class dump_jsonl_multistage: def __init__(self, path, mode="a"): self.wr = jsonlines.open(path, mode=mode, flush=True) def add(self, item): self.wr.write(item) def extend(self, items): for item in items: self.add(item) def close(self): self.wr.close()