UPDATE: More unique filenames

To prevent conflicts with other extensions during importing modules and scripts
+VersionUP (beta4)
This commit is contained in:
Gourieff 2023-09-01 12:38:07 +07:00
parent 2c7187a3af
commit 9a86140cf2
10 changed files with 672 additions and 682 deletions

View File

@ -2,7 +2,7 @@
<img src="example/ReActor_logo_red.png" alt="logo" width="180px"/>
![Version](https://img.shields.io/badge/version-0.4.1_beta3-green?style=for-the-badge&labelColor=darkgreen)<hr>
![Version](https://img.shields.io/badge/version-0.4.1_beta4-green?style=for-the-badge&labelColor=darkgreen)<hr>
[![Commit activity](https://img.shields.io/github/commit-activity/t/Gourieff/sd-webui-reactor/main?cacheSeconds=0)](https://github.com/Gourieff/sd-webui-reactor/commits/main)
![Last commit](https://img.shields.io/github/last-commit/Gourieff/sd-webui-reactor/main?cacheSeconds=0)
[![Opened issues](https://img.shields.io/github/issues/Gourieff/sd-webui-reactor?color=red)](https://github.com/Gourieff/sd-webui-reactor/issues?cacheSeconds=0)

View File

@ -2,7 +2,7 @@
<img src="example/ReActor_logo_red.png" alt="logo" width="180px"/>
![Version](https://img.shields.io/badge/версия-0.4.1_beta3-green?style=for-the-badge&labelColor=darkgreen)<hr>
![Version](https://img.shields.io/badge/версия-0.4.1_beta4-green?style=for-the-badge&labelColor=darkgreen)<hr>
[![Commit activity](https://img.shields.io/github/commit-activity/t/Gourieff/sd-webui-reactor/main?cacheSeconds=0)](https://github.com/Gourieff/sd-webui-reactor/commits/main)
![Last commit](https://img.shields.io/github/last-commit/Gourieff/sd-webui-reactor/main?cacheSeconds=0)
[![Opened issues](https://img.shields.io/github/issues/Gourieff/sd-webui-reactor?color=red)](https://github.com/Gourieff/sd-webui-reactor/issues?cacheSeconds=0)

View File

@ -14,7 +14,7 @@ from insightface.model_zoo import model_zoo
import onnxruntime
import onnx
from onnx import numpy_helper
from scripts.logger import logger
from scripts.reactor_logger import logger
def patched_get_model(self, **kwargs):

View File

@ -14,8 +14,8 @@ from modules.api import api
import gradio as gr
from scripts.swapper import UpscaleOptions, swap_face
from scripts.logger import logger
from scripts.reactor_swapper import UpscaleOptions, swap_face
from scripts.reactor_logger import logger
def default_file_path():

View File

@ -1,362 +1,362 @@
import os, glob
import gradio as gr
from PIL import Image
from typing import List
import modules.scripts as scripts
from modules.upscaler import Upscaler, UpscalerData
from modules import scripts, shared, images, scripts_postprocessing
from modules.processing import (
Processed,
StableDiffusionProcessing,
StableDiffusionProcessingImg2Img,
)
from modules.face_restoration import FaceRestoration
from modules.paths_internal import models_path
from modules.images import save_image
from scripts.logger import logger
from scripts.swapper import UpscaleOptions, swap_face, check_process_halt, reset_messaged
from scripts.version import version_flag, app_title
from scripts.console_log_patch import apply_logging_patch
from scripts.helpers import make_grid
MODELS_PATH = None
def get_models():
global MODELS_PATH
models_path_init = os.path.join(models_path, "insightface/*")
models = glob.glob(models_path_init)
models = [x for x in models if x.endswith(".onnx") or x.endswith(".pth")]
models_names = []
for model in models:
model_path = os.path.split(model)
if MODELS_PATH is None:
MODELS_PATH = model_path[0]
model_name = model_path[1]
models_names.append(model_name)
return models_names
class FaceSwapScript(scripts.Script):
def title(self):
return f"{app_title}"
def show(self, is_img2img):
return scripts.AlwaysVisible
def ui(self, is_img2img):
with gr.Accordion(f"{app_title}", open=False):
with gr.Tab("Main"):
with gr.Column():
img = gr.inputs.Image(type="pil")
enable = gr.Checkbox(False, label="Enable", info=f"The Fast and Simple FaceSwap Extension - {version_flag}")
save_original = gr.Checkbox(False, label="Save Original", info="Save the original image(s) made before swapping; If you use \"img2img\" - this option will affect with \"Swap in generated\" only")
gr.Markdown("<br>")
gr.Markdown("Source Image (above):")
with gr.Row():
source_faces_index = gr.Textbox(
value="0",
placeholder="Which face(s) to use as Source (comma separated)",
label="Comma separated face number(s); Example: 0,2,1",
)
gender_source = gr.Radio(
["No", "Female Only", "Male Only"],
value="No",
label="Gender Detection (Source)",
type="index",
)
gr.Markdown("<br>")
gr.Markdown("Target Image (result):")
with gr.Row():
faces_index = gr.Textbox(
value="0",
placeholder="Which face(s) to Swap into Target (comma separated)",
label="Comma separated face number(s); Example: 1,0,2",
)
gender_target = gr.Radio(
["No", "Female Only", "Male Only"],
value="No",
label="Gender Detection (Target)",
type="index",
)
gr.Markdown("<br>")
with gr.Row():
face_restorer_name = gr.Radio(
label="Restore Face",
choices=["None"] + [x.name() for x in shared.face_restorers],
value=shared.face_restorers[0].name(),
type="value",
)
face_restorer_visibility = gr.Slider(
0, 1, 1, step=0.1, label="Restore Face Visibility"
)
gr.Markdown("<br>")
swap_in_source = gr.Checkbox(
False,
label="Swap in source image",
visible=is_img2img,
)
swap_in_generated = gr.Checkbox(
True,
label="Swap in generated image",
visible=is_img2img,
)
with gr.Tab("Upscale"):
restore_first = gr.Checkbox(
True,
label="1. Restore Face -> 2. Upscale (-Uncheck- if you want vice versa)",
info="Postprocessing Order"
)
upscaler_name = gr.inputs.Dropdown(
choices=[upscaler.name for upscaler in shared.sd_upscalers],
label="Upscaler",
)
gr.Markdown("<br>")
with gr.Row():
upscaler_scale = gr.Slider(1, 8, 1, step=0.1, label="Scale by")
upscaler_visibility = gr.Slider(
0, 1, 1, step=0.1, label="Upscaler Visibility (if scale = 1)"
)
with gr.Tab("Settings"):
models = get_models()
with gr.Row():
if len(models) == 0:
logger.warning(
"You should at least have one model in models directory, please read the doc here : https://github.com/Gourieff/sd-webui-reactor/"
)
model = gr.inputs.Dropdown(
choices=models,
label="Model not found, please download one and reload WebUI",
)
else:
model = gr.inputs.Dropdown(
choices=models, label="Model", default=models[0]
)
console_logging_level = gr.Radio(
["No log", "Minimum", "Default"],
value="Minimum",
label="Console Log Level",
type="index",
)
return [
img,
enable,
source_faces_index,
faces_index,
model,
face_restorer_name,
face_restorer_visibility,
restore_first,
upscaler_name,
upscaler_scale,
upscaler_visibility,
swap_in_source,
swap_in_generated,
console_logging_level,
gender_source,
gender_target,
save_original,
]
@property
def upscaler(self) -> UpscalerData:
for upscaler in shared.sd_upscalers:
if upscaler.name == self.upscaler_name:
return upscaler
return None
@property
def face_restorer(self) -> FaceRestoration:
for face_restorer in shared.face_restorers:
if face_restorer.name() == self.face_restorer_name:
return face_restorer
return None
@property
def upscale_options(self) -> UpscaleOptions:
return UpscaleOptions(
do_restore_first = self.restore_first,
scale=self.upscaler_scale,
upscaler=self.upscaler,
face_restorer=self.face_restorer,
upscale_visibility=self.upscaler_visibility,
restorer_visibility=self.face_restorer_visibility,
)
def process(
self,
p: StableDiffusionProcessing,
img,
enable,
source_faces_index,
faces_index,
model,
face_restorer_name,
face_restorer_visibility,
restore_first,
upscaler_name,
upscaler_scale,
upscaler_visibility,
swap_in_source,
swap_in_generated,
console_logging_level,
gender_source,
gender_target,
save_original,
):
self.enable = enable
if self.enable:
reset_messaged()
if check_process_halt():
return
global MODELS_PATH
self.source = img
self.face_restorer_name = face_restorer_name
self.upscaler_scale = upscaler_scale
self.upscaler_visibility = upscaler_visibility
self.face_restorer_visibility = face_restorer_visibility
self.restore_first = restore_first
self.upscaler_name = upscaler_name
self.swap_in_generated = swap_in_generated
self.model = os.path.join(MODELS_PATH,model)
self.console_logging_level = console_logging_level
self.gender_source = gender_source
self.gender_target = gender_target
self.save_original = save_original
if self.gender_source is None or self.gender_source == "No":
self.gender_source = 0
if self.gender_target is None or self.gender_target == "No":
self.gender_target = 0
self.source_faces_index = [
int(x) for x in source_faces_index.strip(",").split(",") if x.isnumeric()
]
self.faces_index = [
int(x) for x in faces_index.strip(",").split(",") if x.isnumeric()
]
if len(self.source_faces_index) == 0:
self.source_faces_index = [0]
if len(self.faces_index) == 0:
self.faces_index = [0]
if self.source is not None:
apply_logging_patch(console_logging_level)
if isinstance(p, StableDiffusionProcessingImg2Img) and swap_in_source:
logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index)
for i in range(len(p.init_images)):
if len(p.init_images) > 1:
logger.info("Swap in %s", i)
result = swap_face(
self.source,
p.init_images[i],
source_faces_index=self.source_faces_index,
faces_index=self.faces_index,
model=self.model,
upscale_options=self.upscale_options,
gender_source=self.gender_source,
gender_target=self.gender_target,
)
p.init_images[i] = result
if shared.state.interrupted or shared.state.skipped:
return
else:
logger.error("Please provide a source face")
def postprocess(self, p: StableDiffusionProcessing, processed: Processed, *args):
if self.enable:
reset_messaged()
if check_process_halt():
return
if self.save_original:
postprocess_run: bool = True
orig_images : List[Image.Image] = processed.images[processed.index_of_first_image:]
orig_infotexts : List[str] = processed.infotexts[processed.index_of_first_image:]
result_images: List = processed.images
if self.swap_in_generated:
logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index)
if self.source is not None:
for i,(img,info) in enumerate(zip(orig_images, orig_infotexts)):
if check_process_halt():
postprocess_run = False
break
if len(orig_images) > 1:
logger.info("Swap in %s", i)
result = swap_face(
self.source,
img,
source_faces_index=self.source_faces_index,
faces_index=self.faces_index,
model=self.model,
upscale_options=self.upscale_options,
gender_source=self.gender_source,
gender_target=self.gender_target,
)
if result is not None:
suffix = "-swapped"
result_images.append(result)
try:
save_image(result, p.outpath_samples, "", p.all_seeds[0], p.all_prompts[0], "png",info=info, p=p, suffix=suffix)
except:
logger.error("Cannot save a result image - please, check SD WebUI Settings (Saving and Paths)")
else:
logger.error("Cannot create a result image")
if shared.opts.return_grid and len(result_images) > 2 and postprocess_run:
grid = make_grid(result_images)
result_images.insert(0, grid)
try:
save_image(grid, p.outpath_grids, "grid", p.all_seeds[0], p.all_prompts[0], shared.opts.grid_format, info=info, short_filename=not shared.opts.grid_extended_filename, p=p, grid=True)
except:
logger.error("Cannot save a grid - please, check SD WebUI Settings (Saving and Paths)")
processed.images = result_images
def postprocess_batch(self, p, *args, **kwargs):
if self.enable and not self.save_original:
images = kwargs["images"]
def postprocess_image(self, p, script_pp: scripts.PostprocessImageArgs, *args):
if self.enable and self.swap_in_generated and not self.save_original:
current_job_number = shared.state.job_no + 1
job_count = shared.state.job_count
if current_job_number == job_count:
reset_messaged()
if check_process_halt():
return
if self.source is not None:
logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index)
image: Image.Image = script_pp.image
result = swap_face(
self.source,
image,
source_faces_index=self.source_faces_index,
faces_index=self.faces_index,
model=self.model,
upscale_options=self.upscale_options,
gender_source=self.gender_source,
gender_target=self.gender_target,
)
try:
pp = scripts_postprocessing.PostprocessedImage(result)
pp.info = {}
p.extra_generation_params.update(pp.info)
script_pp.image = pp.image
except:
logger.error("Cannot create a result image")
import os, glob
import gradio as gr
from PIL import Image
from typing import List
import modules.scripts as scripts
from modules.upscaler import Upscaler, UpscalerData
from modules import scripts, shared, images, scripts_postprocessing
from modules.processing import (
Processed,
StableDiffusionProcessing,
StableDiffusionProcessingImg2Img,
)
from modules.face_restoration import FaceRestoration
from modules.paths_internal import models_path
from modules.images import save_image
from scripts.reactor_logger import logger
from scripts.reactor_swapper import UpscaleOptions, swap_face, check_process_halt, reset_messaged
from scripts.reactor_version import version_flag, app_title
from scripts.console_log_patch import apply_logging_patch
from scripts.reactor_helpers import make_grid
MODELS_PATH = None
def get_models():
global MODELS_PATH
models_path_init = os.path.join(models_path, "insightface/*")
models = glob.glob(models_path_init)
models = [x for x in models if x.endswith(".onnx") or x.endswith(".pth")]
models_names = []
for model in models:
model_path = os.path.split(model)
if MODELS_PATH is None:
MODELS_PATH = model_path[0]
model_name = model_path[1]
models_names.append(model_name)
return models_names
class FaceSwapScript(scripts.Script):
def title(self):
return f"{app_title}"
def show(self, is_img2img):
return scripts.AlwaysVisible
def ui(self, is_img2img):
with gr.Accordion(f"{app_title}", open=False):
with gr.Tab("Main"):
with gr.Column():
img = gr.inputs.Image(type="pil")
enable = gr.Checkbox(False, label="Enable", info=f"The Fast and Simple FaceSwap Extension - {version_flag}")
save_original = gr.Checkbox(False, label="Save Original", info="Save the original image(s) made before swapping; If you use \"img2img\" - this option will affect with \"Swap in generated\" only")
gr.Markdown("<br>")
gr.Markdown("Source Image (above):")
with gr.Row():
source_faces_index = gr.Textbox(
value="0",
placeholder="Which face(s) to use as Source (comma separated)",
label="Comma separated face number(s); Example: 0,2,1",
)
gender_source = gr.Radio(
["No", "Female Only", "Male Only"],
value="No",
label="Gender Detection (Source)",
type="index",
)
gr.Markdown("<br>")
gr.Markdown("Target Image (result):")
with gr.Row():
faces_index = gr.Textbox(
value="0",
placeholder="Which face(s) to Swap into Target (comma separated)",
label="Comma separated face number(s); Example: 1,0,2",
)
gender_target = gr.Radio(
["No", "Female Only", "Male Only"],
value="No",
label="Gender Detection (Target)",
type="index",
)
gr.Markdown("<br>")
with gr.Row():
face_restorer_name = gr.Radio(
label="Restore Face",
choices=["None"] + [x.name() for x in shared.face_restorers],
value=shared.face_restorers[0].name(),
type="value",
)
face_restorer_visibility = gr.Slider(
0, 1, 1, step=0.1, label="Restore Face Visibility"
)
gr.Markdown("<br>")
swap_in_source = gr.Checkbox(
False,
label="Swap in source image",
visible=is_img2img,
)
swap_in_generated = gr.Checkbox(
True,
label="Swap in generated image",
visible=is_img2img,
)
with gr.Tab("Upscale"):
restore_first = gr.Checkbox(
True,
label="1. Restore Face -> 2. Upscale (-Uncheck- if you want vice versa)",
info="Postprocessing Order"
)
upscaler_name = gr.inputs.Dropdown(
choices=[upscaler.name for upscaler in shared.sd_upscalers],
label="Upscaler",
)
gr.Markdown("<br>")
with gr.Row():
upscaler_scale = gr.Slider(1, 8, 1, step=0.1, label="Scale by")
upscaler_visibility = gr.Slider(
0, 1, 1, step=0.1, label="Upscaler Visibility (if scale = 1)"
)
with gr.Tab("Settings"):
models = get_models()
with gr.Row():
if len(models) == 0:
logger.warning(
"You should at least have one model in models directory, please read the doc here : https://github.com/Gourieff/sd-webui-reactor/"
)
model = gr.inputs.Dropdown(
choices=models,
label="Model not found, please download one and reload WebUI",
)
else:
model = gr.inputs.Dropdown(
choices=models, label="Model", default=models[0]
)
console_logging_level = gr.Radio(
["No log", "Minimum", "Default"],
value="Minimum",
label="Console Log Level",
type="index",
)
return [
img,
enable,
source_faces_index,
faces_index,
model,
face_restorer_name,
face_restorer_visibility,
restore_first,
upscaler_name,
upscaler_scale,
upscaler_visibility,
swap_in_source,
swap_in_generated,
console_logging_level,
gender_source,
gender_target,
save_original,
]
@property
def upscaler(self) -> UpscalerData:
for upscaler in shared.sd_upscalers:
if upscaler.name == self.upscaler_name:
return upscaler
return None
@property
def face_restorer(self) -> FaceRestoration:
for face_restorer in shared.face_restorers:
if face_restorer.name() == self.face_restorer_name:
return face_restorer
return None
@property
def upscale_options(self) -> UpscaleOptions:
return UpscaleOptions(
do_restore_first = self.restore_first,
scale=self.upscaler_scale,
upscaler=self.upscaler,
face_restorer=self.face_restorer,
upscale_visibility=self.upscaler_visibility,
restorer_visibility=self.face_restorer_visibility,
)
def process(
self,
p: StableDiffusionProcessing,
img,
enable,
source_faces_index,
faces_index,
model,
face_restorer_name,
face_restorer_visibility,
restore_first,
upscaler_name,
upscaler_scale,
upscaler_visibility,
swap_in_source,
swap_in_generated,
console_logging_level,
gender_source,
gender_target,
save_original,
):
self.enable = enable
if self.enable:
reset_messaged()
if check_process_halt():
return
global MODELS_PATH
self.source = img
self.face_restorer_name = face_restorer_name
self.upscaler_scale = upscaler_scale
self.upscaler_visibility = upscaler_visibility
self.face_restorer_visibility = face_restorer_visibility
self.restore_first = restore_first
self.upscaler_name = upscaler_name
self.swap_in_generated = swap_in_generated
self.model = os.path.join(MODELS_PATH,model)
self.console_logging_level = console_logging_level
self.gender_source = gender_source
self.gender_target = gender_target
self.save_original = save_original
if self.gender_source is None or self.gender_source == "No":
self.gender_source = 0
if self.gender_target is None or self.gender_target == "No":
self.gender_target = 0
self.source_faces_index = [
int(x) for x in source_faces_index.strip(",").split(",") if x.isnumeric()
]
self.faces_index = [
int(x) for x in faces_index.strip(",").split(",") if x.isnumeric()
]
if len(self.source_faces_index) == 0:
self.source_faces_index = [0]
if len(self.faces_index) == 0:
self.faces_index = [0]
if self.source is not None:
apply_logging_patch(console_logging_level)
if isinstance(p, StableDiffusionProcessingImg2Img) and swap_in_source:
logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index)
for i in range(len(p.init_images)):
if len(p.init_images) > 1:
logger.info("Swap in %s", i)
result = swap_face(
self.source,
p.init_images[i],
source_faces_index=self.source_faces_index,
faces_index=self.faces_index,
model=self.model,
upscale_options=self.upscale_options,
gender_source=self.gender_source,
gender_target=self.gender_target,
)
p.init_images[i] = result
if shared.state.interrupted or shared.state.skipped:
return
else:
logger.error("Please provide a source face")
def postprocess(self, p: StableDiffusionProcessing, processed: Processed, *args):
if self.enable:
reset_messaged()
if check_process_halt():
return
if self.save_original:
postprocess_run: bool = True
orig_images : List[Image.Image] = processed.images[processed.index_of_first_image:]
orig_infotexts : List[str] = processed.infotexts[processed.index_of_first_image:]
result_images: List = processed.images
if self.swap_in_generated:
logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index)
if self.source is not None:
for i,(img,info) in enumerate(zip(orig_images, orig_infotexts)):
if check_process_halt():
postprocess_run = False
break
if len(orig_images) > 1:
logger.info("Swap in %s", i)
result = swap_face(
self.source,
img,
source_faces_index=self.source_faces_index,
faces_index=self.faces_index,
model=self.model,
upscale_options=self.upscale_options,
gender_source=self.gender_source,
gender_target=self.gender_target,
)
if result is not None:
suffix = "-swapped"
result_images.append(result)
try:
save_image(result, p.outpath_samples, "", p.all_seeds[0], p.all_prompts[0], "png",info=info, p=p, suffix=suffix)
except:
logger.error("Cannot save a result image - please, check SD WebUI Settings (Saving and Paths)")
else:
logger.error("Cannot create a result image")
if shared.opts.return_grid and len(result_images) > 2 and postprocess_run:
grid = make_grid(result_images)
result_images.insert(0, grid)
try:
save_image(grid, p.outpath_grids, "grid", p.all_seeds[0], p.all_prompts[0], shared.opts.grid_format, info=info, short_filename=not shared.opts.grid_extended_filename, p=p, grid=True)
except:
logger.error("Cannot save a grid - please, check SD WebUI Settings (Saving and Paths)")
processed.images = result_images
def postprocess_batch(self, p, *args, **kwargs):
if self.enable and not self.save_original:
images = kwargs["images"]
def postprocess_image(self, p, script_pp: scripts.PostprocessImageArgs, *args):
if self.enable and self.swap_in_generated and not self.save_original:
current_job_number = shared.state.job_no + 1
job_count = shared.state.job_count
if current_job_number == job_count:
reset_messaged()
if check_process_halt():
return
if self.source is not None:
logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index)
image: Image.Image = script_pp.image
result = swap_face(
self.source,
image,
source_faces_index=self.source_faces_index,
faces_index=self.faces_index,
model=self.model,
upscale_options=self.upscale_options,
gender_source=self.gender_source,
gender_target=self.gender_target,
)
try:
pp = scripts_postprocessing.PostprocessedImage(result)
pp.info = {}
p.extra_generation_params.update(pp.info)
script_pp.image = pp.image
except:
logger.error("Cannot create a result image")

View File

@ -4,16 +4,6 @@ from math import isqrt, ceil
from typing import List
def make_grid(image_list: List):
"""
Creates a square image by combining multiple images in a grid pattern.
Args:
image_list (list): List of PIL Image objects to be combined.
Returns:
PIL Image object: The resulting square image.
None: If the image_list is empty or contains only one image.
"""
# Count the occurrences of each image size in the image_list
size_counter = Counter(image.size for image in image_list)
@ -52,4 +42,4 @@ def make_grid(image_list: List):
return square_image
# Return None if there are no images or only one image in the image_list
return None
return None

View File

@ -3,7 +3,7 @@ import copy
import sys
from modules import shared
from scripts.globals import IS_RUN
from scripts.reactor_globals import IS_RUN
class ColoredFormatter(logging.Formatter):

View File

@ -1,301 +1,301 @@
import copy
import os
from dataclasses import dataclass
from typing import List, Union
import cv2
import numpy as np
from PIL import Image
import insightface
import onnxruntime
from modules.face_restoration import FaceRestoration
from modules.upscaler import UpscalerData
from modules.shared import state
from modules.paths_internal import models_path
from scripts.logger import logger
import warnings
np.warnings = warnings
np.warnings.filterwarnings('ignore')
providers = onnxruntime.get_available_providers()
@dataclass
class UpscaleOptions:
do_restore_first: bool = True
scale: int = 1
upscaler: UpscalerData = None
upscale_visibility: float = 0.5
face_restorer: FaceRestoration = None
restorer_visibility: float = 0.5
def cosine_distance(vector1: np.ndarray, vector2: np.ndarray) -> float:
vec1 = vector1.flatten()
vec2 = vector2.flatten()
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
cosine_distance = 1 - (dot_product / (norm1 * norm2))
return cosine_distance
def cosine_similarity(test_vec: np.ndarray, source_vecs: List[np.ndarray]) -> float:
cos_dist = sum(cosine_distance(test_vec, source_vec) for source_vec in source_vecs)
average_cos_dist = cos_dist / len(source_vecs)
return average_cos_dist
MESSAGED_STOPPED = False
MESSAGED_SKIPPED = False
def reset_messaged():
global MESSAGED_STOPPED, MESSAGED_SKIPPED
if not state.interrupted:
MESSAGED_STOPPED = False
if not state.skipped:
MESSAGED_SKIPPED = False
def check_process_halt(msgforced: bool = False):
global MESSAGED_STOPPED, MESSAGED_SKIPPED
if state.interrupted:
if not MESSAGED_STOPPED or msgforced:
logger.info("Stopped by User")
MESSAGED_STOPPED = True
return True
if state.skipped:
if not MESSAGED_SKIPPED or msgforced:
logger.info("Skipped by User")
MESSAGED_SKIPPED = True
return True
return False
FS_MODEL = None
CURRENT_FS_MODEL_PATH = None
ANALYSIS_MODEL = None
def getAnalysisModel():
global ANALYSIS_MODEL
if ANALYSIS_MODEL is None:
ANALYSIS_MODEL = insightface.app.FaceAnalysis(
name="buffalo_l", providers=providers, root=os.path.join(models_path, "insightface") # note: allowed_modules=['detection', 'genderage']
)
return ANALYSIS_MODEL
def getFaceSwapModel(model_path: str):
global FS_MODEL
global CURRENT_FS_MODEL_PATH
if CURRENT_FS_MODEL_PATH is None or CURRENT_FS_MODEL_PATH != model_path:
CURRENT_FS_MODEL_PATH = model_path
FS_MODEL = insightface.model_zoo.get_model(model_path, providers=providers)
return FS_MODEL
def upscale_image(image: Image, upscale_options: UpscaleOptions):
result_image = image
if check_process_halt(msgforced=True):
return result_image
if upscale_options.do_restore_first:
if upscale_options.face_restorer is not None:
original_image = result_image.copy()
logger.info("Restoring the face with %s", upscale_options.face_restorer.name())
numpy_image = np.array(result_image)
numpy_image = upscale_options.face_restorer.restore(numpy_image)
restored_image = Image.fromarray(numpy_image)
result_image = Image.blend(
original_image, restored_image, upscale_options.restorer_visibility
)
if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None":
original_image = result_image.copy()
logger.info(
"Upscaling with %s scale = %s",
upscale_options.upscaler.name,
upscale_options.scale,
)
result_image = upscale_options.upscaler.scaler.upscale(
original_image, upscale_options.scale, upscale_options.upscaler.data_path
)
if upscale_options.scale == 1:
result_image = Image.blend(
original_image, result_image, upscale_options.upscale_visibility
)
else:
if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None":
original_image = result_image.copy()
logger.info(
"Upscaling with %s scale = %s",
upscale_options.upscaler.name,
upscale_options.scale,
)
result_image = upscale_options.upscaler.scaler.upscale(
image, upscale_options.scale, upscale_options.upscaler.data_path
)
if upscale_options.scale == 1:
result_image = Image.blend(
original_image, result_image, upscale_options.upscale_visibility
)
if upscale_options.face_restorer is not None:
original_image = result_image.copy()
logger.info("Restoring the face with %s", upscale_options.face_restorer.name())
numpy_image = np.array(result_image)
numpy_image = upscale_options.face_restorer.restore(numpy_image)
restored_image = Image.fromarray(numpy_image)
result_image = Image.blend(
original_image, restored_image, upscale_options.restorer_visibility
)
return result_image
def get_face_gender(
face,
face_index,
gender_condition,
operated: str
):
gender = [
x.sex
for x in face
]
gender.reverse()
face_gender = gender[face_index]
logger.info("%s Face %s: Detected Gender -%s-", operated, face_index, face_gender)
if (gender_condition == 1 and face_gender == "F") or (gender_condition == 2 and face_gender == "M"):
logger.info("OK - Detected Gender matches Condition")
try:
return sorted(face, key=lambda x: x.bbox[0])[face_index], 0
except IndexError:
return None, 0
else:
logger.info("WRONG - Detected Gender doesn't match Condition")
return sorted(face, key=lambda x: x.bbox[0])[face_index], 1
def reget_face_single(img_data, det_size, face_index):
det_size_half = (det_size[0] // 2, det_size[1] // 2)
return get_face_single(img_data, face_index=face_index, det_size=det_size_half)
def get_face_single(img_data: np.ndarray, face_index=0, det_size=(640, 640), gender_source=0, gender_target=0):
face_analyser = copy.deepcopy(getAnalysisModel())
face_analyser.prepare(ctx_id=0, det_size=det_size)
face = face_analyser.get(img_data)
buffalo_path = os.path.join(models_path, "insightface/models/buffalo_l.zip")
if os.path.exists(buffalo_path):
os.remove(buffalo_path)
if gender_source != 0:
if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320:
return reget_face_single(img_data, det_size, face_index)
return get_face_gender(face,face_index,gender_source,"Source")
if gender_target != 0:
if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320:
return reget_face_single(img_data, det_size, face_index)
return get_face_gender(face,face_index,gender_target,"Target")
if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320:
return reget_face_single(img_data, det_size, face_index)
try:
return sorted(face, key=lambda x: x.bbox[0])[face_index], 0
except IndexError:
return None, 0
def swap_face(
source_img: Image.Image,
target_img: Image.Image,
model: Union[str, None] = None,
source_faces_index: List[int] = [0],
faces_index: List[int] = [0],
upscale_options: Union[UpscaleOptions, None] = None,
gender_source: int = 0,
gender_target: int = 0,
):
result_image = target_img
if check_process_halt():
return result_image
if model is not None:
if isinstance(source_img, str): # source_img is a base64 string
import base64, io
if 'base64,' in source_img: # check if the base64 string has a data URL scheme
# split the base64 string to get the actual base64 encoded image data
base64_data = source_img.split('base64,')[-1]
# decode base64 string to bytes
img_bytes = base64.b64decode(base64_data)
else:
# if no data URL scheme, just decode
img_bytes = base64.b64decode(source_img)
source_img = Image.open(io.BytesIO(img_bytes))
source_img = cv2.cvtColor(np.array(source_img), cv2.COLOR_RGB2BGR)
target_img = cv2.cvtColor(np.array(target_img), cv2.COLOR_RGB2BGR)
source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[0], gender_source=gender_source)
if len(source_faces_index) != 0 and len(source_faces_index) != 1 and len(source_faces_index) != len(faces_index):
logger.info("Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.")
elif source_face is not None:
result = target_img
face_swapper = getFaceSwapModel(model)
source_face_idx = 0
swapped = 0
for face_num in faces_index:
if len(source_faces_index) > 1 and source_face_idx > 0:
source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[source_face_idx], gender_source=gender_source)
source_face_idx += 1
if source_face is not None and wrong_gender == 0:
target_face, wrong_gender = get_face_single(target_img, face_index=face_num, gender_target=gender_target)
if target_face is not None and wrong_gender == 0:
result = face_swapper.get(result, target_face, source_face)
swapped += 1
elif wrong_gender == 1:
wrong_gender = 0
if source_face_idx == len(source_faces_index):
result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
if upscale_options is not None:
result_image = upscale_image(result_image, upscale_options)
return result_image
else:
logger.info(f"No target face found for {face_num}")
elif wrong_gender == 1:
wrong_gender = 0
if source_face_idx == len(source_faces_index):
result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
if upscale_options is not None:
result_image = upscale_image(result_image, upscale_options)
return result_image
else:
logger.info(f"No source face found for face number {source_face_idx}.")
result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
if upscale_options is not None and swapped > 0:
result_image = upscale_image(result_image, upscale_options)
else:
logger.info("No source face(s) found")
return result_image
import copy
import os
from dataclasses import dataclass
from typing import List, Union
import cv2
import numpy as np
from PIL import Image
import insightface
import onnxruntime
from modules.face_restoration import FaceRestoration
from modules.upscaler import UpscalerData
from modules.shared import state
from modules.paths_internal import models_path
from scripts.reactor_logger import logger
import warnings
np.warnings = warnings
np.warnings.filterwarnings('ignore')
providers = onnxruntime.get_available_providers()
@dataclass
class UpscaleOptions:
do_restore_first: bool = True
scale: int = 1
upscaler: UpscalerData = None
upscale_visibility: float = 0.5
face_restorer: FaceRestoration = None
restorer_visibility: float = 0.5
def cosine_distance(vector1: np.ndarray, vector2: np.ndarray) -> float:
vec1 = vector1.flatten()
vec2 = vector2.flatten()
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
cosine_distance = 1 - (dot_product / (norm1 * norm2))
return cosine_distance
def cosine_similarity(test_vec: np.ndarray, source_vecs: List[np.ndarray]) -> float:
cos_dist = sum(cosine_distance(test_vec, source_vec) for source_vec in source_vecs)
average_cos_dist = cos_dist / len(source_vecs)
return average_cos_dist
MESSAGED_STOPPED = False
MESSAGED_SKIPPED = False
def reset_messaged():
global MESSAGED_STOPPED, MESSAGED_SKIPPED
if not state.interrupted:
MESSAGED_STOPPED = False
if not state.skipped:
MESSAGED_SKIPPED = False
def check_process_halt(msgforced: bool = False):
global MESSAGED_STOPPED, MESSAGED_SKIPPED
if state.interrupted:
if not MESSAGED_STOPPED or msgforced:
logger.info("Stopped by User")
MESSAGED_STOPPED = True
return True
if state.skipped:
if not MESSAGED_SKIPPED or msgforced:
logger.info("Skipped by User")
MESSAGED_SKIPPED = True
return True
return False
FS_MODEL = None
CURRENT_FS_MODEL_PATH = None
ANALYSIS_MODEL = None
def getAnalysisModel():
global ANALYSIS_MODEL
if ANALYSIS_MODEL is None:
ANALYSIS_MODEL = insightface.app.FaceAnalysis(
name="buffalo_l", providers=providers, root=os.path.join(models_path, "insightface") # note: allowed_modules=['detection', 'genderage']
)
return ANALYSIS_MODEL
def getFaceSwapModel(model_path: str):
global FS_MODEL
global CURRENT_FS_MODEL_PATH
if CURRENT_FS_MODEL_PATH is None or CURRENT_FS_MODEL_PATH != model_path:
CURRENT_FS_MODEL_PATH = model_path
FS_MODEL = insightface.model_zoo.get_model(model_path, providers=providers)
return FS_MODEL
def upscale_image(image: Image, upscale_options: UpscaleOptions):
result_image = image
if check_process_halt(msgforced=True):
return result_image
if upscale_options.do_restore_first:
if upscale_options.face_restorer is not None:
original_image = result_image.copy()
logger.info("Restoring the face with %s", upscale_options.face_restorer.name())
numpy_image = np.array(result_image)
numpy_image = upscale_options.face_restorer.restore(numpy_image)
restored_image = Image.fromarray(numpy_image)
result_image = Image.blend(
original_image, restored_image, upscale_options.restorer_visibility
)
if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None":
original_image = result_image.copy()
logger.info(
"Upscaling with %s scale = %s",
upscale_options.upscaler.name,
upscale_options.scale,
)
result_image = upscale_options.upscaler.scaler.upscale(
original_image, upscale_options.scale, upscale_options.upscaler.data_path
)
if upscale_options.scale == 1:
result_image = Image.blend(
original_image, result_image, upscale_options.upscale_visibility
)
else:
if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None":
original_image = result_image.copy()
logger.info(
"Upscaling with %s scale = %s",
upscale_options.upscaler.name,
upscale_options.scale,
)
result_image = upscale_options.upscaler.scaler.upscale(
image, upscale_options.scale, upscale_options.upscaler.data_path
)
if upscale_options.scale == 1:
result_image = Image.blend(
original_image, result_image, upscale_options.upscale_visibility
)
if upscale_options.face_restorer is not None:
original_image = result_image.copy()
logger.info("Restoring the face with %s", upscale_options.face_restorer.name())
numpy_image = np.array(result_image)
numpy_image = upscale_options.face_restorer.restore(numpy_image)
restored_image = Image.fromarray(numpy_image)
result_image = Image.blend(
original_image, restored_image, upscale_options.restorer_visibility
)
return result_image
def get_face_gender(
face,
face_index,
gender_condition,
operated: str
):
gender = [
x.sex
for x in face
]
gender.reverse()
face_gender = gender[face_index]
logger.info("%s Face %s: Detected Gender -%s-", operated, face_index, face_gender)
if (gender_condition == 1 and face_gender == "F") or (gender_condition == 2 and face_gender == "M"):
logger.info("OK - Detected Gender matches Condition")
try:
return sorted(face, key=lambda x: x.bbox[0])[face_index], 0
except IndexError:
return None, 0
else:
logger.info("WRONG - Detected Gender doesn't match Condition")
return sorted(face, key=lambda x: x.bbox[0])[face_index], 1
def reget_face_single(img_data, det_size, face_index):
det_size_half = (det_size[0] // 2, det_size[1] // 2)
return get_face_single(img_data, face_index=face_index, det_size=det_size_half)
def get_face_single(img_data: np.ndarray, face_index=0, det_size=(640, 640), gender_source=0, gender_target=0):
face_analyser = copy.deepcopy(getAnalysisModel())
face_analyser.prepare(ctx_id=0, det_size=det_size)
face = face_analyser.get(img_data)
buffalo_path = os.path.join(models_path, "insightface/models/buffalo_l.zip")
if os.path.exists(buffalo_path):
os.remove(buffalo_path)
if gender_source != 0:
if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320:
return reget_face_single(img_data, det_size, face_index)
return get_face_gender(face,face_index,gender_source,"Source")
if gender_target != 0:
if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320:
return reget_face_single(img_data, det_size, face_index)
return get_face_gender(face,face_index,gender_target,"Target")
if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320:
return reget_face_single(img_data, det_size, face_index)
try:
return sorted(face, key=lambda x: x.bbox[0])[face_index], 0
except IndexError:
return None, 0
def swap_face(
source_img: Image.Image,
target_img: Image.Image,
model: Union[str, None] = None,
source_faces_index: List[int] = [0],
faces_index: List[int] = [0],
upscale_options: Union[UpscaleOptions, None] = None,
gender_source: int = 0,
gender_target: int = 0,
):
result_image = target_img
if check_process_halt():
return result_image
if model is not None:
if isinstance(source_img, str): # source_img is a base64 string
import base64, io
if 'base64,' in source_img: # check if the base64 string has a data URL scheme
# split the base64 string to get the actual base64 encoded image data
base64_data = source_img.split('base64,')[-1]
# decode base64 string to bytes
img_bytes = base64.b64decode(base64_data)
else:
# if no data URL scheme, just decode
img_bytes = base64.b64decode(source_img)
source_img = Image.open(io.BytesIO(img_bytes))
source_img = cv2.cvtColor(np.array(source_img), cv2.COLOR_RGB2BGR)
target_img = cv2.cvtColor(np.array(target_img), cv2.COLOR_RGB2BGR)
source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[0], gender_source=gender_source)
if len(source_faces_index) != 0 and len(source_faces_index) != 1 and len(source_faces_index) != len(faces_index):
logger.info("Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.")
elif source_face is not None:
result = target_img
face_swapper = getFaceSwapModel(model)
source_face_idx = 0
swapped = 0
for face_num in faces_index:
if len(source_faces_index) > 1 and source_face_idx > 0:
source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[source_face_idx], gender_source=gender_source)
source_face_idx += 1
if source_face is not None and wrong_gender == 0:
target_face, wrong_gender = get_face_single(target_img, face_index=face_num, gender_target=gender_target)
if target_face is not None and wrong_gender == 0:
result = face_swapper.get(result, target_face, source_face)
swapped += 1
elif wrong_gender == 1:
wrong_gender = 0
if source_face_idx == len(source_faces_index):
result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
if upscale_options is not None:
result_image = upscale_image(result_image, upscale_options)
return result_image
else:
logger.info(f"No target face found for {face_num}")
elif wrong_gender == 1:
wrong_gender = 0
if source_face_idx == len(source_faces_index):
result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
if upscale_options is not None:
result_image = upscale_image(result_image, upscale_options)
return result_image
else:
logger.info(f"No source face found for face number {source_face_idx}.")
result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
if upscale_options is not None and swapped > 0:
result_image = upscale_image(result_image, upscale_options)
else:
logger.info("No source face(s) found")
return result_image

View File

@ -1,7 +1,7 @@
app_title = "ReActor"
version_flag = "v0.4.1-b3"
version_flag = "v0.4.1-b4"
from scripts.logger import logger, get_Run, set_Run
from scripts.reactor_logger import logger, get_Run, set_Run
is_run = get_Run()