From 9a86140cf2d836291b4769e4bfab16e5846cdc22 Mon Sep 17 00:00:00 2001 From: Gourieff <777@lovemet.ru> Date: Fri, 1 Sep 2023 12:38:07 +0700 Subject: [PATCH] UPDATE: More unique filenames To prevent conflicts with other extensions during importing modules and scripts +VersionUP (beta4) --- README.md | 2 +- README_RU.md | 2 +- scripts/console_log_patch.py | 2 +- scripts/reactor_api.py | 4 +- scripts/{faceswap.py => reactor_faceswap.py} | 724 +++++++++---------- scripts/{globals.py => reactor_globals.py} | 0 scripts/{helpers.py => reactor_helpers.py} | 12 +- scripts/{logger.py => reactor_logger.py} | 2 +- scripts/{swapper.py => reactor_swapper.py} | 602 +++++++-------- scripts/{version.py => reactor_version.py} | 4 +- 10 files changed, 672 insertions(+), 682 deletions(-) rename scripts/{faceswap.py => reactor_faceswap.py} (96%) rename scripts/{globals.py => reactor_globals.py} (100%) rename scripts/{helpers.py => reactor_helpers.py} (82%) rename scripts/{logger.py => reactor_logger.py} (97%) rename scripts/{swapper.py => reactor_swapper.py} (97%) rename scripts/{version.py => reactor_version.py} (57%) diff --git a/README.md b/README.md index 8b5d9b0..864437f 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ logo - ![Version](https://img.shields.io/badge/version-0.4.1_beta3-green?style=for-the-badge&labelColor=darkgreen)
+ ![Version](https://img.shields.io/badge/version-0.4.1_beta4-green?style=for-the-badge&labelColor=darkgreen)
[![Commit activity](https://img.shields.io/github/commit-activity/t/Gourieff/sd-webui-reactor/main?cacheSeconds=0)](https://github.com/Gourieff/sd-webui-reactor/commits/main) ![Last commit](https://img.shields.io/github/last-commit/Gourieff/sd-webui-reactor/main?cacheSeconds=0) [![Opened issues](https://img.shields.io/github/issues/Gourieff/sd-webui-reactor?color=red)](https://github.com/Gourieff/sd-webui-reactor/issues?cacheSeconds=0) diff --git a/README_RU.md b/README_RU.md index 160e8e3..81df1f8 100644 --- a/README_RU.md +++ b/README_RU.md @@ -2,7 +2,7 @@ logo - ![Version](https://img.shields.io/badge/версия-0.4.1_beta3-green?style=for-the-badge&labelColor=darkgreen)
+ ![Version](https://img.shields.io/badge/версия-0.4.1_beta4-green?style=for-the-badge&labelColor=darkgreen)
[![Commit activity](https://img.shields.io/github/commit-activity/t/Gourieff/sd-webui-reactor/main?cacheSeconds=0)](https://github.com/Gourieff/sd-webui-reactor/commits/main) ![Last commit](https://img.shields.io/github/last-commit/Gourieff/sd-webui-reactor/main?cacheSeconds=0) [![Opened issues](https://img.shields.io/github/issues/Gourieff/sd-webui-reactor?color=red)](https://github.com/Gourieff/sd-webui-reactor/issues?cacheSeconds=0) diff --git a/scripts/console_log_patch.py b/scripts/console_log_patch.py index 1e737e3..8e9d6db 100644 --- a/scripts/console_log_patch.py +++ b/scripts/console_log_patch.py @@ -14,7 +14,7 @@ from insightface.model_zoo import model_zoo import onnxruntime import onnx from onnx import numpy_helper -from scripts.logger import logger +from scripts.reactor_logger import logger def patched_get_model(self, **kwargs): diff --git a/scripts/reactor_api.py b/scripts/reactor_api.py index 6c2106f..ea3db93 100644 --- a/scripts/reactor_api.py +++ b/scripts/reactor_api.py @@ -14,8 +14,8 @@ from modules.api import api import gradio as gr -from scripts.swapper import UpscaleOptions, swap_face -from scripts.logger import logger +from scripts.reactor_swapper import UpscaleOptions, swap_face +from scripts.reactor_logger import logger def default_file_path(): diff --git a/scripts/faceswap.py b/scripts/reactor_faceswap.py similarity index 96% rename from scripts/faceswap.py rename to scripts/reactor_faceswap.py index 009a400..c68f511 100644 --- a/scripts/faceswap.py +++ b/scripts/reactor_faceswap.py @@ -1,362 +1,362 @@ -import os, glob -import gradio as gr -from PIL import Image - -from typing import List - -import modules.scripts as scripts -from modules.upscaler import Upscaler, UpscalerData -from modules import scripts, shared, images, scripts_postprocessing -from modules.processing import ( - Processed, - StableDiffusionProcessing, - StableDiffusionProcessingImg2Img, -) -from modules.face_restoration import FaceRestoration -from modules.paths_internal import models_path -from modules.images import save_image - -from scripts.logger import logger -from scripts.swapper import UpscaleOptions, swap_face, check_process_halt, reset_messaged -from scripts.version import version_flag, app_title -from scripts.console_log_patch import apply_logging_patch -from scripts.helpers import make_grid - - -MODELS_PATH = None - -def get_models(): - global MODELS_PATH - models_path_init = os.path.join(models_path, "insightface/*") - models = glob.glob(models_path_init) - models = [x for x in models if x.endswith(".onnx") or x.endswith(".pth")] - models_names = [] - for model in models: - model_path = os.path.split(model) - if MODELS_PATH is None: - MODELS_PATH = model_path[0] - model_name = model_path[1] - models_names.append(model_name) - return models_names - - -class FaceSwapScript(scripts.Script): - def title(self): - return f"{app_title}" - - def show(self, is_img2img): - return scripts.AlwaysVisible - - def ui(self, is_img2img): - with gr.Accordion(f"{app_title}", open=False): - with gr.Tab("Main"): - with gr.Column(): - img = gr.inputs.Image(type="pil") - enable = gr.Checkbox(False, label="Enable", info=f"The Fast and Simple FaceSwap Extension - {version_flag}") - save_original = gr.Checkbox(False, label="Save Original", info="Save the original image(s) made before swapping; If you use \"img2img\" - this option will affect with \"Swap in generated\" only") - gr.Markdown("
") - gr.Markdown("Source Image (above):") - with gr.Row(): - source_faces_index = gr.Textbox( - value="0", - placeholder="Which face(s) to use as Source (comma separated)", - label="Comma separated face number(s); Example: 0,2,1", - ) - gender_source = gr.Radio( - ["No", "Female Only", "Male Only"], - value="No", - label="Gender Detection (Source)", - type="index", - ) - gr.Markdown("
") - gr.Markdown("Target Image (result):") - with gr.Row(): - faces_index = gr.Textbox( - value="0", - placeholder="Which face(s) to Swap into Target (comma separated)", - label="Comma separated face number(s); Example: 1,0,2", - ) - gender_target = gr.Radio( - ["No", "Female Only", "Male Only"], - value="No", - label="Gender Detection (Target)", - type="index", - ) - gr.Markdown("
") - with gr.Row(): - face_restorer_name = gr.Radio( - label="Restore Face", - choices=["None"] + [x.name() for x in shared.face_restorers], - value=shared.face_restorers[0].name(), - type="value", - ) - face_restorer_visibility = gr.Slider( - 0, 1, 1, step=0.1, label="Restore Face Visibility" - ) - gr.Markdown("
") - swap_in_source = gr.Checkbox( - False, - label="Swap in source image", - visible=is_img2img, - ) - swap_in_generated = gr.Checkbox( - True, - label="Swap in generated image", - visible=is_img2img, - ) - with gr.Tab("Upscale"): - restore_first = gr.Checkbox( - True, - label="1. Restore Face -> 2. Upscale (-Uncheck- if you want vice versa)", - info="Postprocessing Order" - ) - upscaler_name = gr.inputs.Dropdown( - choices=[upscaler.name for upscaler in shared.sd_upscalers], - label="Upscaler", - ) - gr.Markdown("
") - with gr.Row(): - upscaler_scale = gr.Slider(1, 8, 1, step=0.1, label="Scale by") - upscaler_visibility = gr.Slider( - 0, 1, 1, step=0.1, label="Upscaler Visibility (if scale = 1)" - ) - with gr.Tab("Settings"): - models = get_models() - with gr.Row(): - if len(models) == 0: - logger.warning( - "You should at least have one model in models directory, please read the doc here : https://github.com/Gourieff/sd-webui-reactor/" - ) - model = gr.inputs.Dropdown( - choices=models, - label="Model not found, please download one and reload WebUI", - ) - else: - model = gr.inputs.Dropdown( - choices=models, label="Model", default=models[0] - ) - console_logging_level = gr.Radio( - ["No log", "Minimum", "Default"], - value="Minimum", - label="Console Log Level", - type="index", - ) - - return [ - img, - enable, - source_faces_index, - faces_index, - model, - face_restorer_name, - face_restorer_visibility, - restore_first, - upscaler_name, - upscaler_scale, - upscaler_visibility, - swap_in_source, - swap_in_generated, - console_logging_level, - gender_source, - gender_target, - save_original, - ] - - - @property - def upscaler(self) -> UpscalerData: - for upscaler in shared.sd_upscalers: - if upscaler.name == self.upscaler_name: - return upscaler - return None - - @property - def face_restorer(self) -> FaceRestoration: - for face_restorer in shared.face_restorers: - if face_restorer.name() == self.face_restorer_name: - return face_restorer - return None - - @property - def upscale_options(self) -> UpscaleOptions: - return UpscaleOptions( - do_restore_first = self.restore_first, - scale=self.upscaler_scale, - upscaler=self.upscaler, - face_restorer=self.face_restorer, - upscale_visibility=self.upscaler_visibility, - restorer_visibility=self.face_restorer_visibility, - ) - - def process( - self, - p: StableDiffusionProcessing, - img, - enable, - source_faces_index, - faces_index, - model, - face_restorer_name, - face_restorer_visibility, - restore_first, - upscaler_name, - upscaler_scale, - upscaler_visibility, - swap_in_source, - swap_in_generated, - console_logging_level, - gender_source, - gender_target, - save_original, - ): - self.enable = enable - if self.enable: - - reset_messaged() - if check_process_halt(): - return - - global MODELS_PATH - self.source = img - self.face_restorer_name = face_restorer_name - self.upscaler_scale = upscaler_scale - self.upscaler_visibility = upscaler_visibility - self.face_restorer_visibility = face_restorer_visibility - self.restore_first = restore_first - self.upscaler_name = upscaler_name - self.swap_in_generated = swap_in_generated - self.model = os.path.join(MODELS_PATH,model) - self.console_logging_level = console_logging_level - self.gender_source = gender_source - self.gender_target = gender_target - self.save_original = save_original - if self.gender_source is None or self.gender_source == "No": - self.gender_source = 0 - if self.gender_target is None or self.gender_target == "No": - self.gender_target = 0 - self.source_faces_index = [ - int(x) for x in source_faces_index.strip(",").split(",") if x.isnumeric() - ] - self.faces_index = [ - int(x) for x in faces_index.strip(",").split(",") if x.isnumeric() - ] - if len(self.source_faces_index) == 0: - self.source_faces_index = [0] - if len(self.faces_index) == 0: - self.faces_index = [0] - - if self.source is not None: - apply_logging_patch(console_logging_level) - if isinstance(p, StableDiffusionProcessingImg2Img) and swap_in_source: - logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index) - - for i in range(len(p.init_images)): - if len(p.init_images) > 1: - logger.info("Swap in %s", i) - result = swap_face( - self.source, - p.init_images[i], - source_faces_index=self.source_faces_index, - faces_index=self.faces_index, - model=self.model, - upscale_options=self.upscale_options, - gender_source=self.gender_source, - gender_target=self.gender_target, - ) - p.init_images[i] = result - - if shared.state.interrupted or shared.state.skipped: - return - - else: - logger.error("Please provide a source face") - - def postprocess(self, p: StableDiffusionProcessing, processed: Processed, *args): - if self.enable: - - reset_messaged() - if check_process_halt(): - return - - if self.save_original: - - postprocess_run: bool = True - - orig_images : List[Image.Image] = processed.images[processed.index_of_first_image:] - orig_infotexts : List[str] = processed.infotexts[processed.index_of_first_image:] - - result_images: List = processed.images - - if self.swap_in_generated: - logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index) - if self.source is not None: - for i,(img,info) in enumerate(zip(orig_images, orig_infotexts)): - if check_process_halt(): - postprocess_run = False - break - if len(orig_images) > 1: - logger.info("Swap in %s", i) - result = swap_face( - self.source, - img, - source_faces_index=self.source_faces_index, - faces_index=self.faces_index, - model=self.model, - upscale_options=self.upscale_options, - gender_source=self.gender_source, - gender_target=self.gender_target, - ) - if result is not None: - suffix = "-swapped" - result_images.append(result) - try: - save_image(result, p.outpath_samples, "", p.all_seeds[0], p.all_prompts[0], "png",info=info, p=p, suffix=suffix) - except: - logger.error("Cannot save a result image - please, check SD WebUI Settings (Saving and Paths)") - else: - logger.error("Cannot create a result image") - - if shared.opts.return_grid and len(result_images) > 2 and postprocess_run: - grid = make_grid(result_images) - result_images.insert(0, grid) - try: - save_image(grid, p.outpath_grids, "grid", p.all_seeds[0], p.all_prompts[0], shared.opts.grid_format, info=info, short_filename=not shared.opts.grid_extended_filename, p=p, grid=True) - except: - logger.error("Cannot save a grid - please, check SD WebUI Settings (Saving and Paths)") - - processed.images = result_images - - def postprocess_batch(self, p, *args, **kwargs): - if self.enable and not self.save_original: - images = kwargs["images"] - - def postprocess_image(self, p, script_pp: scripts.PostprocessImageArgs, *args): - if self.enable and self.swap_in_generated and not self.save_original: - - current_job_number = shared.state.job_no + 1 - job_count = shared.state.job_count - if current_job_number == job_count: - reset_messaged() - if check_process_halt(): - return - - if self.source is not None: - logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index) - image: Image.Image = script_pp.image - result = swap_face( - self.source, - image, - source_faces_index=self.source_faces_index, - faces_index=self.faces_index, - model=self.model, - upscale_options=self.upscale_options, - gender_source=self.gender_source, - gender_target=self.gender_target, - ) - try: - pp = scripts_postprocessing.PostprocessedImage(result) - pp.info = {} - p.extra_generation_params.update(pp.info) - script_pp.image = pp.image - except: - logger.error("Cannot create a result image") +import os, glob +import gradio as gr +from PIL import Image + +from typing import List + +import modules.scripts as scripts +from modules.upscaler import Upscaler, UpscalerData +from modules import scripts, shared, images, scripts_postprocessing +from modules.processing import ( + Processed, + StableDiffusionProcessing, + StableDiffusionProcessingImg2Img, +) +from modules.face_restoration import FaceRestoration +from modules.paths_internal import models_path +from modules.images import save_image + +from scripts.reactor_logger import logger +from scripts.reactor_swapper import UpscaleOptions, swap_face, check_process_halt, reset_messaged +from scripts.reactor_version import version_flag, app_title +from scripts.console_log_patch import apply_logging_patch +from scripts.reactor_helpers import make_grid + + +MODELS_PATH = None + +def get_models(): + global MODELS_PATH + models_path_init = os.path.join(models_path, "insightface/*") + models = glob.glob(models_path_init) + models = [x for x in models if x.endswith(".onnx") or x.endswith(".pth")] + models_names = [] + for model in models: + model_path = os.path.split(model) + if MODELS_PATH is None: + MODELS_PATH = model_path[0] + model_name = model_path[1] + models_names.append(model_name) + return models_names + + +class FaceSwapScript(scripts.Script): + def title(self): + return f"{app_title}" + + def show(self, is_img2img): + return scripts.AlwaysVisible + + def ui(self, is_img2img): + with gr.Accordion(f"{app_title}", open=False): + with gr.Tab("Main"): + with gr.Column(): + img = gr.inputs.Image(type="pil") + enable = gr.Checkbox(False, label="Enable", info=f"The Fast and Simple FaceSwap Extension - {version_flag}") + save_original = gr.Checkbox(False, label="Save Original", info="Save the original image(s) made before swapping; If you use \"img2img\" - this option will affect with \"Swap in generated\" only") + gr.Markdown("
") + gr.Markdown("Source Image (above):") + with gr.Row(): + source_faces_index = gr.Textbox( + value="0", + placeholder="Which face(s) to use as Source (comma separated)", + label="Comma separated face number(s); Example: 0,2,1", + ) + gender_source = gr.Radio( + ["No", "Female Only", "Male Only"], + value="No", + label="Gender Detection (Source)", + type="index", + ) + gr.Markdown("
") + gr.Markdown("Target Image (result):") + with gr.Row(): + faces_index = gr.Textbox( + value="0", + placeholder="Which face(s) to Swap into Target (comma separated)", + label="Comma separated face number(s); Example: 1,0,2", + ) + gender_target = gr.Radio( + ["No", "Female Only", "Male Only"], + value="No", + label="Gender Detection (Target)", + type="index", + ) + gr.Markdown("
") + with gr.Row(): + face_restorer_name = gr.Radio( + label="Restore Face", + choices=["None"] + [x.name() for x in shared.face_restorers], + value=shared.face_restorers[0].name(), + type="value", + ) + face_restorer_visibility = gr.Slider( + 0, 1, 1, step=0.1, label="Restore Face Visibility" + ) + gr.Markdown("
") + swap_in_source = gr.Checkbox( + False, + label="Swap in source image", + visible=is_img2img, + ) + swap_in_generated = gr.Checkbox( + True, + label="Swap in generated image", + visible=is_img2img, + ) + with gr.Tab("Upscale"): + restore_first = gr.Checkbox( + True, + label="1. Restore Face -> 2. Upscale (-Uncheck- if you want vice versa)", + info="Postprocessing Order" + ) + upscaler_name = gr.inputs.Dropdown( + choices=[upscaler.name for upscaler in shared.sd_upscalers], + label="Upscaler", + ) + gr.Markdown("
") + with gr.Row(): + upscaler_scale = gr.Slider(1, 8, 1, step=0.1, label="Scale by") + upscaler_visibility = gr.Slider( + 0, 1, 1, step=0.1, label="Upscaler Visibility (if scale = 1)" + ) + with gr.Tab("Settings"): + models = get_models() + with gr.Row(): + if len(models) == 0: + logger.warning( + "You should at least have one model in models directory, please read the doc here : https://github.com/Gourieff/sd-webui-reactor/" + ) + model = gr.inputs.Dropdown( + choices=models, + label="Model not found, please download one and reload WebUI", + ) + else: + model = gr.inputs.Dropdown( + choices=models, label="Model", default=models[0] + ) + console_logging_level = gr.Radio( + ["No log", "Minimum", "Default"], + value="Minimum", + label="Console Log Level", + type="index", + ) + + return [ + img, + enable, + source_faces_index, + faces_index, + model, + face_restorer_name, + face_restorer_visibility, + restore_first, + upscaler_name, + upscaler_scale, + upscaler_visibility, + swap_in_source, + swap_in_generated, + console_logging_level, + gender_source, + gender_target, + save_original, + ] + + + @property + def upscaler(self) -> UpscalerData: + for upscaler in shared.sd_upscalers: + if upscaler.name == self.upscaler_name: + return upscaler + return None + + @property + def face_restorer(self) -> FaceRestoration: + for face_restorer in shared.face_restorers: + if face_restorer.name() == self.face_restorer_name: + return face_restorer + return None + + @property + def upscale_options(self) -> UpscaleOptions: + return UpscaleOptions( + do_restore_first = self.restore_first, + scale=self.upscaler_scale, + upscaler=self.upscaler, + face_restorer=self.face_restorer, + upscale_visibility=self.upscaler_visibility, + restorer_visibility=self.face_restorer_visibility, + ) + + def process( + self, + p: StableDiffusionProcessing, + img, + enable, + source_faces_index, + faces_index, + model, + face_restorer_name, + face_restorer_visibility, + restore_first, + upscaler_name, + upscaler_scale, + upscaler_visibility, + swap_in_source, + swap_in_generated, + console_logging_level, + gender_source, + gender_target, + save_original, + ): + self.enable = enable + if self.enable: + + reset_messaged() + if check_process_halt(): + return + + global MODELS_PATH + self.source = img + self.face_restorer_name = face_restorer_name + self.upscaler_scale = upscaler_scale + self.upscaler_visibility = upscaler_visibility + self.face_restorer_visibility = face_restorer_visibility + self.restore_first = restore_first + self.upscaler_name = upscaler_name + self.swap_in_generated = swap_in_generated + self.model = os.path.join(MODELS_PATH,model) + self.console_logging_level = console_logging_level + self.gender_source = gender_source + self.gender_target = gender_target + self.save_original = save_original + if self.gender_source is None or self.gender_source == "No": + self.gender_source = 0 + if self.gender_target is None or self.gender_target == "No": + self.gender_target = 0 + self.source_faces_index = [ + int(x) for x in source_faces_index.strip(",").split(",") if x.isnumeric() + ] + self.faces_index = [ + int(x) for x in faces_index.strip(",").split(",") if x.isnumeric() + ] + if len(self.source_faces_index) == 0: + self.source_faces_index = [0] + if len(self.faces_index) == 0: + self.faces_index = [0] + + if self.source is not None: + apply_logging_patch(console_logging_level) + if isinstance(p, StableDiffusionProcessingImg2Img) and swap_in_source: + logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index) + + for i in range(len(p.init_images)): + if len(p.init_images) > 1: + logger.info("Swap in %s", i) + result = swap_face( + self.source, + p.init_images[i], + source_faces_index=self.source_faces_index, + faces_index=self.faces_index, + model=self.model, + upscale_options=self.upscale_options, + gender_source=self.gender_source, + gender_target=self.gender_target, + ) + p.init_images[i] = result + + if shared.state.interrupted or shared.state.skipped: + return + + else: + logger.error("Please provide a source face") + + def postprocess(self, p: StableDiffusionProcessing, processed: Processed, *args): + if self.enable: + + reset_messaged() + if check_process_halt(): + return + + if self.save_original: + + postprocess_run: bool = True + + orig_images : List[Image.Image] = processed.images[processed.index_of_first_image:] + orig_infotexts : List[str] = processed.infotexts[processed.index_of_first_image:] + + result_images: List = processed.images + + if self.swap_in_generated: + logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index) + if self.source is not None: + for i,(img,info) in enumerate(zip(orig_images, orig_infotexts)): + if check_process_halt(): + postprocess_run = False + break + if len(orig_images) > 1: + logger.info("Swap in %s", i) + result = swap_face( + self.source, + img, + source_faces_index=self.source_faces_index, + faces_index=self.faces_index, + model=self.model, + upscale_options=self.upscale_options, + gender_source=self.gender_source, + gender_target=self.gender_target, + ) + if result is not None: + suffix = "-swapped" + result_images.append(result) + try: + save_image(result, p.outpath_samples, "", p.all_seeds[0], p.all_prompts[0], "png",info=info, p=p, suffix=suffix) + except: + logger.error("Cannot save a result image - please, check SD WebUI Settings (Saving and Paths)") + else: + logger.error("Cannot create a result image") + + if shared.opts.return_grid and len(result_images) > 2 and postprocess_run: + grid = make_grid(result_images) + result_images.insert(0, grid) + try: + save_image(grid, p.outpath_grids, "grid", p.all_seeds[0], p.all_prompts[0], shared.opts.grid_format, info=info, short_filename=not shared.opts.grid_extended_filename, p=p, grid=True) + except: + logger.error("Cannot save a grid - please, check SD WebUI Settings (Saving and Paths)") + + processed.images = result_images + + def postprocess_batch(self, p, *args, **kwargs): + if self.enable and not self.save_original: + images = kwargs["images"] + + def postprocess_image(self, p, script_pp: scripts.PostprocessImageArgs, *args): + if self.enable and self.swap_in_generated and not self.save_original: + + current_job_number = shared.state.job_no + 1 + job_count = shared.state.job_count + if current_job_number == job_count: + reset_messaged() + if check_process_halt(): + return + + if self.source is not None: + logger.info("Working: source face index %s, target face index %s", self.source_faces_index, self.faces_index) + image: Image.Image = script_pp.image + result = swap_face( + self.source, + image, + source_faces_index=self.source_faces_index, + faces_index=self.faces_index, + model=self.model, + upscale_options=self.upscale_options, + gender_source=self.gender_source, + gender_target=self.gender_target, + ) + try: + pp = scripts_postprocessing.PostprocessedImage(result) + pp.info = {} + p.extra_generation_params.update(pp.info) + script_pp.image = pp.image + except: + logger.error("Cannot create a result image") diff --git a/scripts/globals.py b/scripts/reactor_globals.py similarity index 100% rename from scripts/globals.py rename to scripts/reactor_globals.py diff --git a/scripts/helpers.py b/scripts/reactor_helpers.py similarity index 82% rename from scripts/helpers.py rename to scripts/reactor_helpers.py index 0f5eb59..672418b 100644 --- a/scripts/helpers.py +++ b/scripts/reactor_helpers.py @@ -4,16 +4,6 @@ from math import isqrt, ceil from typing import List def make_grid(image_list: List): - """ - Creates a square image by combining multiple images in a grid pattern. - - Args: - image_list (list): List of PIL Image objects to be combined. - - Returns: - PIL Image object: The resulting square image. - None: If the image_list is empty or contains only one image. - """ # Count the occurrences of each image size in the image_list size_counter = Counter(image.size for image in image_list) @@ -52,4 +42,4 @@ def make_grid(image_list: List): return square_image # Return None if there are no images or only one image in the image_list - return None \ No newline at end of file + return None diff --git a/scripts/logger.py b/scripts/reactor_logger.py similarity index 97% rename from scripts/logger.py rename to scripts/reactor_logger.py index 690df55..b8e33f0 100644 --- a/scripts/logger.py +++ b/scripts/reactor_logger.py @@ -3,7 +3,7 @@ import copy import sys from modules import shared -from scripts.globals import IS_RUN +from scripts.reactor_globals import IS_RUN class ColoredFormatter(logging.Formatter): diff --git a/scripts/swapper.py b/scripts/reactor_swapper.py similarity index 97% rename from scripts/swapper.py rename to scripts/reactor_swapper.py index 4047302..6bb0014 100644 --- a/scripts/swapper.py +++ b/scripts/reactor_swapper.py @@ -1,301 +1,301 @@ -import copy -import os -from dataclasses import dataclass -from typing import List, Union - -import cv2 -import numpy as np -from PIL import Image - -import insightface -import onnxruntime - -from modules.face_restoration import FaceRestoration -from modules.upscaler import UpscalerData -from modules.shared import state -from modules.paths_internal import models_path -from scripts.logger import logger - -import warnings - -np.warnings = warnings -np.warnings.filterwarnings('ignore') - -providers = onnxruntime.get_available_providers() - - -@dataclass -class UpscaleOptions: - do_restore_first: bool = True - scale: int = 1 - upscaler: UpscalerData = None - upscale_visibility: float = 0.5 - face_restorer: FaceRestoration = None - restorer_visibility: float = 0.5 - - -def cosine_distance(vector1: np.ndarray, vector2: np.ndarray) -> float: - vec1 = vector1.flatten() - vec2 = vector2.flatten() - - dot_product = np.dot(vec1, vec2) - norm1 = np.linalg.norm(vec1) - norm2 = np.linalg.norm(vec2) - - cosine_distance = 1 - (dot_product / (norm1 * norm2)) - return cosine_distance - - -def cosine_similarity(test_vec: np.ndarray, source_vecs: List[np.ndarray]) -> float: - cos_dist = sum(cosine_distance(test_vec, source_vec) for source_vec in source_vecs) - average_cos_dist = cos_dist / len(source_vecs) - return average_cos_dist - - -MESSAGED_STOPPED = False -MESSAGED_SKIPPED = False - -def reset_messaged(): - global MESSAGED_STOPPED, MESSAGED_SKIPPED - if not state.interrupted: - MESSAGED_STOPPED = False - if not state.skipped: - MESSAGED_SKIPPED = False - -def check_process_halt(msgforced: bool = False): - global MESSAGED_STOPPED, MESSAGED_SKIPPED - if state.interrupted: - if not MESSAGED_STOPPED or msgforced: - logger.info("Stopped by User") - MESSAGED_STOPPED = True - return True - if state.skipped: - if not MESSAGED_SKIPPED or msgforced: - logger.info("Skipped by User") - MESSAGED_SKIPPED = True - return True - return False - - -FS_MODEL = None -CURRENT_FS_MODEL_PATH = None - -ANALYSIS_MODEL = None - - -def getAnalysisModel(): - global ANALYSIS_MODEL - if ANALYSIS_MODEL is None: - ANALYSIS_MODEL = insightface.app.FaceAnalysis( - name="buffalo_l", providers=providers, root=os.path.join(models_path, "insightface") # note: allowed_modules=['detection', 'genderage'] - ) - return ANALYSIS_MODEL - - -def getFaceSwapModel(model_path: str): - global FS_MODEL - global CURRENT_FS_MODEL_PATH - if CURRENT_FS_MODEL_PATH is None or CURRENT_FS_MODEL_PATH != model_path: - CURRENT_FS_MODEL_PATH = model_path - FS_MODEL = insightface.model_zoo.get_model(model_path, providers=providers) - - return FS_MODEL - - -def upscale_image(image: Image, upscale_options: UpscaleOptions): - result_image = image - - if check_process_halt(msgforced=True): - return result_image - - if upscale_options.do_restore_first: - if upscale_options.face_restorer is not None: - original_image = result_image.copy() - logger.info("Restoring the face with %s", upscale_options.face_restorer.name()) - numpy_image = np.array(result_image) - numpy_image = upscale_options.face_restorer.restore(numpy_image) - restored_image = Image.fromarray(numpy_image) - result_image = Image.blend( - original_image, restored_image, upscale_options.restorer_visibility - ) - if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None": - original_image = result_image.copy() - logger.info( - "Upscaling with %s scale = %s", - upscale_options.upscaler.name, - upscale_options.scale, - ) - result_image = upscale_options.upscaler.scaler.upscale( - original_image, upscale_options.scale, upscale_options.upscaler.data_path - ) - if upscale_options.scale == 1: - result_image = Image.blend( - original_image, result_image, upscale_options.upscale_visibility - ) - else: - if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None": - original_image = result_image.copy() - logger.info( - "Upscaling with %s scale = %s", - upscale_options.upscaler.name, - upscale_options.scale, - ) - result_image = upscale_options.upscaler.scaler.upscale( - image, upscale_options.scale, upscale_options.upscaler.data_path - ) - if upscale_options.scale == 1: - result_image = Image.blend( - original_image, result_image, upscale_options.upscale_visibility - ) - if upscale_options.face_restorer is not None: - original_image = result_image.copy() - logger.info("Restoring the face with %s", upscale_options.face_restorer.name()) - numpy_image = np.array(result_image) - numpy_image = upscale_options.face_restorer.restore(numpy_image) - restored_image = Image.fromarray(numpy_image) - result_image = Image.blend( - original_image, restored_image, upscale_options.restorer_visibility - ) - - return result_image - - -def get_face_gender( - face, - face_index, - gender_condition, - operated: str -): - gender = [ - x.sex - for x in face - ] - gender.reverse() - face_gender = gender[face_index] - logger.info("%s Face %s: Detected Gender -%s-", operated, face_index, face_gender) - if (gender_condition == 1 and face_gender == "F") or (gender_condition == 2 and face_gender == "M"): - logger.info("OK - Detected Gender matches Condition") - try: - return sorted(face, key=lambda x: x.bbox[0])[face_index], 0 - except IndexError: - return None, 0 - else: - logger.info("WRONG - Detected Gender doesn't match Condition") - return sorted(face, key=lambda x: x.bbox[0])[face_index], 1 - - -def reget_face_single(img_data, det_size, face_index): - det_size_half = (det_size[0] // 2, det_size[1] // 2) - return get_face_single(img_data, face_index=face_index, det_size=det_size_half) - - -def get_face_single(img_data: np.ndarray, face_index=0, det_size=(640, 640), gender_source=0, gender_target=0): - face_analyser = copy.deepcopy(getAnalysisModel()) - face_analyser.prepare(ctx_id=0, det_size=det_size) - face = face_analyser.get(img_data) - - buffalo_path = os.path.join(models_path, "insightface/models/buffalo_l.zip") - if os.path.exists(buffalo_path): - os.remove(buffalo_path) - - if gender_source != 0: - if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: - return reget_face_single(img_data, det_size, face_index) - return get_face_gender(face,face_index,gender_source,"Source") - - if gender_target != 0: - if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: - return reget_face_single(img_data, det_size, face_index) - return get_face_gender(face,face_index,gender_target,"Target") - - if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: - return reget_face_single(img_data, det_size, face_index) - - try: - return sorted(face, key=lambda x: x.bbox[0])[face_index], 0 - except IndexError: - return None, 0 - - -def swap_face( - source_img: Image.Image, - target_img: Image.Image, - model: Union[str, None] = None, - source_faces_index: List[int] = [0], - faces_index: List[int] = [0], - upscale_options: Union[UpscaleOptions, None] = None, - gender_source: int = 0, - gender_target: int = 0, -): - result_image = target_img - - if check_process_halt(): - return result_image - - if model is not None: - - if isinstance(source_img, str): # source_img is a base64 string - import base64, io - if 'base64,' in source_img: # check if the base64 string has a data URL scheme - # split the base64 string to get the actual base64 encoded image data - base64_data = source_img.split('base64,')[-1] - # decode base64 string to bytes - img_bytes = base64.b64decode(base64_data) - else: - # if no data URL scheme, just decode - img_bytes = base64.b64decode(source_img) - - source_img = Image.open(io.BytesIO(img_bytes)) - - source_img = cv2.cvtColor(np.array(source_img), cv2.COLOR_RGB2BGR) - target_img = cv2.cvtColor(np.array(target_img), cv2.COLOR_RGB2BGR) - - source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[0], gender_source=gender_source) - - if len(source_faces_index) != 0 and len(source_faces_index) != 1 and len(source_faces_index) != len(faces_index): - logger.info("Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.") - elif source_face is not None: - - result = target_img - face_swapper = getFaceSwapModel(model) - - source_face_idx = 0 - - swapped = 0 - - for face_num in faces_index: - if len(source_faces_index) > 1 and source_face_idx > 0: - source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[source_face_idx], gender_source=gender_source) - source_face_idx += 1 - - if source_face is not None and wrong_gender == 0: - target_face, wrong_gender = get_face_single(target_img, face_index=face_num, gender_target=gender_target) - if target_face is not None and wrong_gender == 0: - result = face_swapper.get(result, target_face, source_face) - swapped += 1 - elif wrong_gender == 1: - wrong_gender = 0 - if source_face_idx == len(source_faces_index): - result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) - if upscale_options is not None: - result_image = upscale_image(result_image, upscale_options) - return result_image - else: - logger.info(f"No target face found for {face_num}") - elif wrong_gender == 1: - wrong_gender = 0 - if source_face_idx == len(source_faces_index): - result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) - if upscale_options is not None: - result_image = upscale_image(result_image, upscale_options) - return result_image - else: - logger.info(f"No source face found for face number {source_face_idx}.") - - result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) - if upscale_options is not None and swapped > 0: - result_image = upscale_image(result_image, upscale_options) - - else: - logger.info("No source face(s) found") - return result_image +import copy +import os +from dataclasses import dataclass +from typing import List, Union + +import cv2 +import numpy as np +from PIL import Image + +import insightface +import onnxruntime + +from modules.face_restoration import FaceRestoration +from modules.upscaler import UpscalerData +from modules.shared import state +from modules.paths_internal import models_path +from scripts.reactor_logger import logger + +import warnings + +np.warnings = warnings +np.warnings.filterwarnings('ignore') + +providers = onnxruntime.get_available_providers() + + +@dataclass +class UpscaleOptions: + do_restore_first: bool = True + scale: int = 1 + upscaler: UpscalerData = None + upscale_visibility: float = 0.5 + face_restorer: FaceRestoration = None + restorer_visibility: float = 0.5 + + +def cosine_distance(vector1: np.ndarray, vector2: np.ndarray) -> float: + vec1 = vector1.flatten() + vec2 = vector2.flatten() + + dot_product = np.dot(vec1, vec2) + norm1 = np.linalg.norm(vec1) + norm2 = np.linalg.norm(vec2) + + cosine_distance = 1 - (dot_product / (norm1 * norm2)) + return cosine_distance + + +def cosine_similarity(test_vec: np.ndarray, source_vecs: List[np.ndarray]) -> float: + cos_dist = sum(cosine_distance(test_vec, source_vec) for source_vec in source_vecs) + average_cos_dist = cos_dist / len(source_vecs) + return average_cos_dist + + +MESSAGED_STOPPED = False +MESSAGED_SKIPPED = False + +def reset_messaged(): + global MESSAGED_STOPPED, MESSAGED_SKIPPED + if not state.interrupted: + MESSAGED_STOPPED = False + if not state.skipped: + MESSAGED_SKIPPED = False + +def check_process_halt(msgforced: bool = False): + global MESSAGED_STOPPED, MESSAGED_SKIPPED + if state.interrupted: + if not MESSAGED_STOPPED or msgforced: + logger.info("Stopped by User") + MESSAGED_STOPPED = True + return True + if state.skipped: + if not MESSAGED_SKIPPED or msgforced: + logger.info("Skipped by User") + MESSAGED_SKIPPED = True + return True + return False + + +FS_MODEL = None +CURRENT_FS_MODEL_PATH = None + +ANALYSIS_MODEL = None + + +def getAnalysisModel(): + global ANALYSIS_MODEL + if ANALYSIS_MODEL is None: + ANALYSIS_MODEL = insightface.app.FaceAnalysis( + name="buffalo_l", providers=providers, root=os.path.join(models_path, "insightface") # note: allowed_modules=['detection', 'genderage'] + ) + return ANALYSIS_MODEL + + +def getFaceSwapModel(model_path: str): + global FS_MODEL + global CURRENT_FS_MODEL_PATH + if CURRENT_FS_MODEL_PATH is None or CURRENT_FS_MODEL_PATH != model_path: + CURRENT_FS_MODEL_PATH = model_path + FS_MODEL = insightface.model_zoo.get_model(model_path, providers=providers) + + return FS_MODEL + + +def upscale_image(image: Image, upscale_options: UpscaleOptions): + result_image = image + + if check_process_halt(msgforced=True): + return result_image + + if upscale_options.do_restore_first: + if upscale_options.face_restorer is not None: + original_image = result_image.copy() + logger.info("Restoring the face with %s", upscale_options.face_restorer.name()) + numpy_image = np.array(result_image) + numpy_image = upscale_options.face_restorer.restore(numpy_image) + restored_image = Image.fromarray(numpy_image) + result_image = Image.blend( + original_image, restored_image, upscale_options.restorer_visibility + ) + if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None": + original_image = result_image.copy() + logger.info( + "Upscaling with %s scale = %s", + upscale_options.upscaler.name, + upscale_options.scale, + ) + result_image = upscale_options.upscaler.scaler.upscale( + original_image, upscale_options.scale, upscale_options.upscaler.data_path + ) + if upscale_options.scale == 1: + result_image = Image.blend( + original_image, result_image, upscale_options.upscale_visibility + ) + else: + if upscale_options.upscaler is not None and upscale_options.upscaler.name != "None": + original_image = result_image.copy() + logger.info( + "Upscaling with %s scale = %s", + upscale_options.upscaler.name, + upscale_options.scale, + ) + result_image = upscale_options.upscaler.scaler.upscale( + image, upscale_options.scale, upscale_options.upscaler.data_path + ) + if upscale_options.scale == 1: + result_image = Image.blend( + original_image, result_image, upscale_options.upscale_visibility + ) + if upscale_options.face_restorer is not None: + original_image = result_image.copy() + logger.info("Restoring the face with %s", upscale_options.face_restorer.name()) + numpy_image = np.array(result_image) + numpy_image = upscale_options.face_restorer.restore(numpy_image) + restored_image = Image.fromarray(numpy_image) + result_image = Image.blend( + original_image, restored_image, upscale_options.restorer_visibility + ) + + return result_image + + +def get_face_gender( + face, + face_index, + gender_condition, + operated: str +): + gender = [ + x.sex + for x in face + ] + gender.reverse() + face_gender = gender[face_index] + logger.info("%s Face %s: Detected Gender -%s-", operated, face_index, face_gender) + if (gender_condition == 1 and face_gender == "F") or (gender_condition == 2 and face_gender == "M"): + logger.info("OK - Detected Gender matches Condition") + try: + return sorted(face, key=lambda x: x.bbox[0])[face_index], 0 + except IndexError: + return None, 0 + else: + logger.info("WRONG - Detected Gender doesn't match Condition") + return sorted(face, key=lambda x: x.bbox[0])[face_index], 1 + + +def reget_face_single(img_data, det_size, face_index): + det_size_half = (det_size[0] // 2, det_size[1] // 2) + return get_face_single(img_data, face_index=face_index, det_size=det_size_half) + + +def get_face_single(img_data: np.ndarray, face_index=0, det_size=(640, 640), gender_source=0, gender_target=0): + face_analyser = copy.deepcopy(getAnalysisModel()) + face_analyser.prepare(ctx_id=0, det_size=det_size) + face = face_analyser.get(img_data) + + buffalo_path = os.path.join(models_path, "insightface/models/buffalo_l.zip") + if os.path.exists(buffalo_path): + os.remove(buffalo_path) + + if gender_source != 0: + if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: + return reget_face_single(img_data, det_size, face_index) + return get_face_gender(face,face_index,gender_source,"Source") + + if gender_target != 0: + if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: + return reget_face_single(img_data, det_size, face_index) + return get_face_gender(face,face_index,gender_target,"Target") + + if len(face) == 0 and det_size[0] > 320 and det_size[1] > 320: + return reget_face_single(img_data, det_size, face_index) + + try: + return sorted(face, key=lambda x: x.bbox[0])[face_index], 0 + except IndexError: + return None, 0 + + +def swap_face( + source_img: Image.Image, + target_img: Image.Image, + model: Union[str, None] = None, + source_faces_index: List[int] = [0], + faces_index: List[int] = [0], + upscale_options: Union[UpscaleOptions, None] = None, + gender_source: int = 0, + gender_target: int = 0, +): + result_image = target_img + + if check_process_halt(): + return result_image + + if model is not None: + + if isinstance(source_img, str): # source_img is a base64 string + import base64, io + if 'base64,' in source_img: # check if the base64 string has a data URL scheme + # split the base64 string to get the actual base64 encoded image data + base64_data = source_img.split('base64,')[-1] + # decode base64 string to bytes + img_bytes = base64.b64decode(base64_data) + else: + # if no data URL scheme, just decode + img_bytes = base64.b64decode(source_img) + + source_img = Image.open(io.BytesIO(img_bytes)) + + source_img = cv2.cvtColor(np.array(source_img), cv2.COLOR_RGB2BGR) + target_img = cv2.cvtColor(np.array(target_img), cv2.COLOR_RGB2BGR) + + source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[0], gender_source=gender_source) + + if len(source_faces_index) != 0 and len(source_faces_index) != 1 and len(source_faces_index) != len(faces_index): + logger.info("Source Faces must have no entries (default=0), one entry, or same number of entries as target faces.") + elif source_face is not None: + + result = target_img + face_swapper = getFaceSwapModel(model) + + source_face_idx = 0 + + swapped = 0 + + for face_num in faces_index: + if len(source_faces_index) > 1 and source_face_idx > 0: + source_face, wrong_gender = get_face_single(source_img, face_index=source_faces_index[source_face_idx], gender_source=gender_source) + source_face_idx += 1 + + if source_face is not None and wrong_gender == 0: + target_face, wrong_gender = get_face_single(target_img, face_index=face_num, gender_target=gender_target) + if target_face is not None and wrong_gender == 0: + result = face_swapper.get(result, target_face, source_face) + swapped += 1 + elif wrong_gender == 1: + wrong_gender = 0 + if source_face_idx == len(source_faces_index): + result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) + if upscale_options is not None: + result_image = upscale_image(result_image, upscale_options) + return result_image + else: + logger.info(f"No target face found for {face_num}") + elif wrong_gender == 1: + wrong_gender = 0 + if source_face_idx == len(source_faces_index): + result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) + if upscale_options is not None: + result_image = upscale_image(result_image, upscale_options) + return result_image + else: + logger.info(f"No source face found for face number {source_face_idx}.") + + result_image = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_BGR2RGB)) + if upscale_options is not None and swapped > 0: + result_image = upscale_image(result_image, upscale_options) + + else: + logger.info("No source face(s) found") + return result_image diff --git a/scripts/version.py b/scripts/reactor_version.py similarity index 57% rename from scripts/version.py rename to scripts/reactor_version.py index 61b56b8..612f35c 100644 --- a/scripts/version.py +++ b/scripts/reactor_version.py @@ -1,7 +1,7 @@ app_title = "ReActor" -version_flag = "v0.4.1-b3" +version_flag = "v0.4.1-b4" -from scripts.logger import logger, get_Run, set_Run +from scripts.reactor_logger import logger, get_Run, set_Run is_run = get_Run()