-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy path067-force-lightmode.py
More file actions
executable file
·130 lines (105 loc) · 3.85 KB
/
Copy path067-force-lightmode.py
File metadata and controls
executable file
·130 lines (105 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
import os
import time
import shutil
import traceback
import subprocess
import importlib.util
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
import psutil
from PIL import Image, ImageStat, ImageOps
# Directories
src = "065-remove-page-borders"
dst = os.path.splitext(os.path.basename(__file__))[0]
lightness_txt_path = f"{dst}.lightness.txt"
os.makedirs(dst, exist_ok=True)
if 0:
# set config here
class DeskewConfig:
# Threshold to consider a page "black" (mean lightness close to 0)
# white page with lots of black text can have 80
BLACK_LIGHTNESS_THRESHOLD = 25
config = DeskewConfig()
else:
# load config from file
config_path = Path("066-force-lightmode-config.py")
spec = importlib.util.spec_from_file_location("force_lightmode_config", config_path)
config = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config)
def get_physical_cpu_count():
try:
# Attempt to get the number of physical cores using psutil
return psutil.cpu_count(logical=False)
except AttributeError:
# If psutil is not available or does not support this function, use os.cpu_count()
return os.cpu_count()
max_workers = get_physical_cpu_count() or 1
def process_page(filepath):
filename = os.path.basename(filepath)
# page_number = int(filename[:-5]) # strip ".tiff" suffix
out_path = os.path.join(dst, filename)
# Compute mean lightness (0–100)
try:
with Image.open(filepath) as img:
gray = img.convert("L")
stat = ImageStat.Stat(gray)
lightness = stat.mean[0] / 255 * 100
except Exception:
lightness = -1.0
if lightness == -1.0 or lightness > config.BLACK_LIGHTNESS_THRESHOLD:
shutil.copy2(filepath, out_path)
return (filename, lightness, False)
if os.path.exists(out_path):
return (filename, lightness, True)
with Image.open(filepath) as img:
if img.mode not in ("L", "RGB"):
img = img.convert("RGB")
# Invert colors
inverted = ImageOps.invert(img)
inverted.save(out_path)
return (filename, lightness, True)
def try_process_page(filepath):
"return (result, exc)"
try:
return process_page(filepath), None
except Exception as exc:
return None, exc
def main():
t1 = int(time.time())
num_pages = 0
# Collect all TIFF files that actually need processing
tiff_files = []
for f in sorted(os.listdir(src)):
if not f.lower().endswith(".tiff"):
continue
in_path = os.path.join(src, f)
if 1:
# also process files that already exist in output
tiff_files.append(in_path)
continue
out_path = os.path.join(dst, f)
if os.path.exists(out_path):
# Skip files that already exist in output
continue
tiff_files.append(in_path)
# process pages in parallel
results = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(try_process_page, f): f for f in tiff_files}
for future in as_completed(futures):
res, err = future.result()
if err is not None:
executor.shutdown(cancel_futures=True)
raise err # propagate exception to main
# filename, lightness, done_invert = res
results.append(res)
num_pages += 1
with open(lightness_txt_path, "w", encoding="utf8") as fd:
# sort by lightness descending
for (filename, lightness, done_invert) in sorted(results, key=lambda x: -x[1]):
fd.write(f"{lightness:010.6f} {filename} {int(done_invert)}\n")
t2 = int(time.time())
print(f"Done {num_pages} pages in {t2 - t1} seconds")
if __name__ == "__main__":
main()