From cb2ac5f7a672faeb7b81886b6c1f1481ef51f90d Mon Sep 17 00:00:00 2001 From: Martin Bielik Date: Sun, 15 Dec 2024 23:32:55 +0100 Subject: refactoring: import python when needed, run as functions --- py/chat.py | 118 +++++++++++++++++++++++++-------------------------- py/complete.py | 98 +++++++++++++++++++++--------------------- py/config.py | 131 --------------------------------------------------------- py/context.py | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ py/roles.py | 26 +++++------- py/utils.py | 46 +++++++++----------- 6 files changed, 260 insertions(+), 284 deletions(-) delete mode 100644 py/config.py create mode 100644 py/context.py (limited to 'py') diff --git a/py/chat.py b/py/chat.py index 9dd4d4e..79457ee 100644 --- a/py/chat.py +++ b/py/chat.py @@ -1,76 +1,74 @@ import vim -# import utils -plugin_root = vim.eval("s:plugin_root") -vim.command(f"py3file {plugin_root}/py/utils.py") +chat_py_imported = True -prompt = vim.eval("l:prompt") -config = make_config(vim.eval("l:config")) -config_options = config['options'] -config_ui = config['ui'] +def run_ai_chat(context): + prompt = context['prompt'] + config = make_config(context['config']) + config_options = config['options'] + config_ui = config['ui'] -def initialize_chat_window(): - lines = vim.eval('getline(1, "$")') - contains_user_prompt = '>>> user' in lines - if not contains_user_prompt: - # user role not found, put whole file content as an user prompt - vim.command("normal! gg") - populates_options = config_ui['populate_options'] == '1' - if populates_options: - vim.command("normal! O[chat-options]") - vim.command("normal! o") - for key, value in config_options.items(): - if key == 'initial_prompt': - value = "\\n".join(value) - vim.command("normal! i" + key + "=" + value + "\n") - vim.command("normal! " + ("o" if populates_options else "O")) - vim.command("normal! i>>> user\n") - - vim.command("normal! G") - vim_break_undo_sequence() - vim.command("redraw") - - file_content = vim.eval('trim(join(getline(1, "$"), "\n"))') - role_lines = re.findall(r'(^>>> user|^>>> system|^<<< assistant).*', file_content, flags=re.MULTILINE) - if not role_lines[-1].startswith(">>> user"): - # last role is not user, most likely completion was cancelled before - vim.command("normal! o") - vim.command("normal! i\n>>> user\n\n") + def initialize_chat_window(): + lines = vim.eval('getline(1, "$")') + contains_user_prompt = '>>> user' in lines + if not contains_user_prompt: + # user role not found, put whole file content as an user prompt + vim.command("normal! gg") + populates_options = config_ui['populate_options'] == '1' + if populates_options: + vim.command("normal! O[chat-options]") + vim.command("normal! o") + for key, value in config_options.items(): + if key == 'initial_prompt': + value = "\\n".join(value) + vim.command("normal! i" + key + "=" + value + "\n") + vim.command("normal! " + ("o" if populates_options else "O")) + vim.command("normal! i>>> user\n") - if prompt: - vim.command("normal! i" + prompt) + vim.command("normal! G") vim_break_undo_sequence() vim.command("redraw") -initialize_chat_window() + file_content = vim.eval('trim(join(getline(1, "$"), "\n"))') + role_lines = re.findall(r'(^>>> user|^>>> system|^<<< assistant).*', file_content, flags=re.MULTILINE) + if not role_lines[-1].startswith(">>> user"): + # last role is not user, most likely completion was cancelled before + vim.command("normal! o") + vim.command("normal! i\n>>> user\n\n") -chat_options = parse_chat_header_options() -options = {**config_options, **chat_options} + if prompt: + vim.command("normal! i" + prompt) + vim_break_undo_sequence() + vim.command("redraw") -initial_prompt = '\n'.join(options.get('initial_prompt', [])) -initial_messages = parse_chat_messages(initial_prompt) + initialize_chat_window() -chat_content = vim.eval('trim(join(getline(1, "$"), "\n"))') -printDebug("[chat] text:\n" + chat_content) -chat_messages = parse_chat_messages(chat_content) -is_selection = vim.eval("l:is_selection") + chat_options = parse_chat_header_options() + options = {**config_options, **chat_options} -messages = initial_messages + chat_messages + initial_prompt = '\n'.join(options.get('initial_prompt', [])) + initial_messages = parse_chat_messages(initial_prompt) -try: - if messages[-1]["content"].strip(): - vim.command("normal! Go\n<<< assistant\n\n") - vim.command("redraw") + chat_content = vim.eval('trim(join(getline(1, "$"), "\n"))') + print_debug("[chat] text:\n" + chat_content) + chat_messages = parse_chat_messages(chat_content) - print('Answering...') - vim.command("redraw") + messages = initial_messages + chat_messages - text_chunks = make_chat_text_chunks(messages, options) - render_text_chunks(text_chunks, is_selection) + try: + if messages[-1]["content"].strip(): + vim.command("normal! Go\n<<< assistant\n\n") + vim.command("redraw") - vim.command("normal! a\n\n>>> user\n\n") - vim.command("redraw") - clear_echo_message() -except BaseException as error: - handle_completion_error(error) - printDebug("[chat] error: {}", traceback.format_exc()) + print('Answering...') + vim.command("redraw") + + text_chunks = make_chat_text_chunks(messages, options) + render_text_chunks(text_chunks) + + vim.command("normal! a\n\n>>> user\n\n") + vim.command("redraw") + clear_echo_message() + except BaseException as error: + handle_completion_error(error) + print_debug("[chat] error: {}", traceback.format_exc()) diff --git a/py/complete.py b/py/complete.py index a7b9569..8078dea 100644 --- a/py/complete.py +++ b/py/complete.py @@ -1,52 +1,50 @@ import vim -# import utils -plugin_root = vim.eval("s:plugin_root") -vim.command(f"py3file {plugin_root}/py/utils.py") - -prompt = vim.eval("l:prompt") -config = make_config(vim.eval("l:config")) -config_options = config['options'] -config_ui = config['ui'] - -engine = config['engine'] -is_selection = vim.eval("l:is_selection") - -def complete_engine(prompt): - openai_options = make_openai_options(config_options) - http_options = make_http_options(config_options) - printDebug("[engine-complete] text:\n" + prompt) - - request = { - 'prompt': prompt, - **openai_options - } - printDebug("[engine-complete] request: {}", request) - url = config_options['endpoint_url'] - response = openai_request(url, request, http_options) - def map_chunk(resp): - printDebug("[engine-complete] response: {}", resp) - return resp['choices'][0].get('text', '') - text_chunks = map(map_chunk, response) - return text_chunks - -def chat_engine(prompt): - initial_prompt = config_options.get('initial_prompt', []) - initial_prompt = '\n'.join(initial_prompt) - chat_content = f"{initial_prompt}\n\n>>> user\n\n{prompt}".strip() - messages = parse_chat_messages(chat_content) - printDebug("[engine-chat] text:\n" + chat_content) - return make_chat_text_chunks(messages, config_options) - -engines = {"chat": chat_engine, "complete": complete_engine} - -try: - if prompt: - print('Completing...') - vim.command("redraw") - text_chunks = engines[engine](prompt) - render_text_chunks(text_chunks, is_selection) - clear_echo_message() -except BaseException as error: - handle_completion_error(error) - printDebug("[complete] error: {}", traceback.format_exc()) +complete_py_imported = True + +def run_ai_completition(context): + prompt = context['prompt'] + config = make_config(context['config']) + config_options = config['options'] + config_ui = config['ui'] + + engine = config['engine'] + + def complete_engine(prompt): + openai_options = make_openai_options(config_options) + http_options = make_http_options(config_options) + print_debug("[engine-complete] text:\n" + prompt) + + request = { + 'prompt': prompt, + **openai_options + } + print_debug("[engine-complete] request: {}", request) + url = config_options['endpoint_url'] + response = openai_request(url, request, http_options) + def map_chunk(resp): + print_debug("[engine-complete] response: {}", resp) + return resp['choices'][0].get('text', '') + text_chunks = map(map_chunk, response) + return text_chunks + + def chat_engine(prompt): + initial_prompt = config_options.get('initial_prompt', []) + initial_prompt = '\n'.join(initial_prompt) + chat_content = f"{initial_prompt}\n\n>>> user\n\n{prompt}".strip() + messages = parse_chat_messages(chat_content) + print_debug("[engine-chat] text:\n" + chat_content) + return make_chat_text_chunks(messages, config_options) + + engines = {"chat": chat_engine, "complete": complete_engine} + + try: + if prompt: + print('Completing...') + vim.command("redraw") + text_chunks = engines[engine](prompt) + render_text_chunks(text_chunks) + clear_echo_message() + except BaseException as error: + handle_completion_error(error) + print_debug("[complete] error: {}", traceback.format_exc()) diff --git a/py/config.py b/py/config.py deleted file mode 100644 index 7739b27..0000000 --- a/py/config.py +++ /dev/null @@ -1,131 +0,0 @@ -import vim -import re -import os -import configparser - -def unwrap(input_var): - return vim.eval(input_var) - -def merge_deep_recursive(target, source = {}): - source = source.copy() - for key, value in source.items(): - if isinstance(value, dict): - target_child = target.setdefault(key, {}) - merge_deep_recursive(target_child, value) - else: - target[key] = value - return target - -def merge_deep(objects): - result = {} - for o in objects: - merge_deep_recursive(result, o) - return result - -def enhance_roles_with_custom_function(roles): - if vim.eval("exists('g:vim_ai_roles_config_function')") == '1': - roles_config_function = vim.eval("g:vim_ai_roles_config_function") - if not vim.eval("exists('*" + roles_config_function + "')"): - raise Exception(f"Role config function does not exist: {roles_config_function}") - else: - roles.update(vim.eval(roles_config_function + "()")) - -def load_role_config(role): - roles_config_path = os.path.expanduser(vim.eval("g:vim_ai_roles_config_file")) - if not os.path.exists(roles_config_path): - raise Exception(f"Role config file does not exist: {roles_config_path}") - - roles = configparser.ConfigParser() - roles.read(roles_config_path) - roles = dict(roles) - - enhance_roles_with_custom_function(roles) - - if not role in roles: - raise Exception(f"Role `{role}` not found") - - options = roles.get(f"{role}.options", {}) - options_complete = roles.get(f"{role}.options-complete", {}) - options_chat = roles.get(f"{role}.options-chat", {}) - - ui = roles.get(f"{role}.ui", {}) - ui_complete = roles.get(f"{role}.ui-complete", {}) - ui_chat = roles.get(f"{role}.ui-chat", {}) - - return { - 'role': dict(roles[role]), - 'config_default': { - 'options': dict(options), - 'ui': dict(ui), - }, - 'config_complete': { - 'options': dict(options_complete), - 'ui': dict(ui_complete), - }, - 'config_chat': { - 'options': dict(options_chat), - 'ui': dict(ui_chat), - }, - } - -def parse_role_names(prompt): - chunks = re.split(r'[ :]+', prompt) - roles = [] - for chunk in chunks: - if not chunk.startswith("/"): - break - roles.append(chunk) - return [raw_role[1:] for raw_role in roles] - -def parse_prompt_and_role_config(user_instruction, command_type): - user_instruction = user_instruction.strip() - roles = parse_role_names(user_instruction) - if not roles: - # does not require role - return (user_instruction, '', {}) - - last_role = roles[-1] - user_prompt = user_instruction[user_instruction.index(last_role) + len(last_role):].strip() # strip roles - - role_configs = merge_deep([load_role_config(role) for role in roles]) - config = merge_deep([role_configs['config_default'], role_configs['config_' + command_type]]) - role_prompt = role_configs['role'].get('prompt', '') - return user_prompt, role_prompt, config - -def make_selection_prompt(user_selection, user_prompt, role_prompt, selection_boundary): - if not user_prompt and not role_prompt: - return user_selection - elif user_selection: - if selection_boundary and selection_boundary not in user_selection: - return f"{selection_boundary}\n{user_selection}\n{selection_boundary}" - else: - return user_selection - return '' - -def make_prompt(role_prompt, user_prompt, user_selection, selection_boundary): - user_prompt = user_prompt.strip() - delimiter = ":\n" if user_prompt and user_selection else "" - user_selection = make_selection_prompt(user_selection, user_prompt, role_prompt, selection_boundary) - prompt = f"{user_prompt}{delimiter}{user_selection}" - if not role_prompt: - return prompt - delimiter = '' if prompt.startswith(':') else ':\n' - prompt = f"{role_prompt}{delimiter}{prompt}" - return prompt - -def make_config_and_prompt(params): - config_default = params['config_default'] - config_extension = params['config_extension'] - user_instruction = params['user_instruction'] - user_selection = params['user_selection'] - command_type = params['command_type'] - - user_prompt, role_prompt, role_config = parse_prompt_and_role_config(user_instruction, command_type) - final_config = merge_deep([config_default, config_extension, role_config]) - selection_boundary = final_config['options']['selection_boundary'] - prompt = make_prompt(role_prompt, user_prompt, user_selection, selection_boundary) - - return { - 'config': final_config, - 'prompt': prompt, - } diff --git a/py/context.py b/py/context.py new file mode 100644 index 0000000..254cd3b --- /dev/null +++ b/py/context.py @@ -0,0 +1,125 @@ +import vim +import re +import os +import configparser + +if "PYTEST_VERSION" in os.environ: + from utils import * + +context_py_imported = True + +def merge_deep_recursive(target, source = {}): + source = source.copy() + for key, value in source.items(): + if isinstance(value, dict): + target_child = target.setdefault(key, {}) + merge_deep_recursive(target_child, value) + else: + target[key] = value + return target + +def merge_deep(objects): + result = {} + for o in objects: + merge_deep_recursive(result, o) + return result + +def load_role_config(role): + roles_config_path = os.path.expanduser(vim.eval("g:vim_ai_roles_config_file")) + if not os.path.exists(roles_config_path): + raise Exception(f"Role config file does not exist: {roles_config_path}") + + roles = configparser.ConfigParser() + roles.read(roles_config_path) + roles = dict(roles) + + enhance_roles_with_custom_function(roles) + + if not role in roles: + raise Exception(f"Role `{role}` not found") + + options = roles.get(f"{role}.options", {}) + options_complete = roles.get(f"{role}.options-complete", {}) + options_chat = roles.get(f"{role}.options-chat", {}) + + ui = roles.get(f"{role}.ui", {}) + ui_complete = roles.get(f"{role}.ui-complete", {}) + ui_chat = roles.get(f"{role}.ui-chat", {}) + + return { + 'role': dict(roles[role]), + 'config_default': { + 'options': dict(options), + 'ui': dict(ui), + }, + 'config_complete': { + 'options': dict(options_complete), + 'ui': dict(ui_complete), + }, + 'config_chat': { + 'options': dict(options_chat), + 'ui': dict(ui_chat), + }, + } + +def parse_role_names(prompt): + chunks = re.split(r'[ :]+', prompt) + roles = [] + for chunk in chunks: + if not chunk.startswith("/"): + break + roles.append(chunk) + return [raw_role[1:] for raw_role in roles] + +def parse_prompt_and_role_config(user_instruction, command_type): + user_instruction = user_instruction.strip() + roles = parse_role_names(user_instruction) + if not roles: + # does not require role + return (user_instruction, '', {}) + + last_role = roles[-1] + user_prompt = user_instruction[user_instruction.index(last_role) + len(last_role):].strip() # strip roles + + role_configs = merge_deep([load_role_config(role) for role in roles]) + config = merge_deep([role_configs['config_default'], role_configs['config_' + command_type]]) + role_prompt = role_configs['role'].get('prompt', '') + return user_prompt, role_prompt, config + +def make_selection_prompt(user_selection, user_prompt, role_prompt, selection_boundary): + if not user_prompt and not role_prompt: + return user_selection + elif user_selection: + if selection_boundary and selection_boundary not in user_selection: + return f"{selection_boundary}\n{user_selection}\n{selection_boundary}" + else: + return user_selection + return '' + +def make_prompt(role_prompt, user_prompt, user_selection, selection_boundary): + user_prompt = user_prompt.strip() + delimiter = ":\n" if user_prompt and user_selection else "" + user_selection = make_selection_prompt(user_selection, user_prompt, role_prompt, selection_boundary) + prompt = f"{user_prompt}{delimiter}{user_selection}" + if not role_prompt: + return prompt + delimiter = '' if prompt.startswith(':') else ':\n' + prompt = f"{role_prompt}{delimiter}{prompt}" + return prompt + +def make_ai_context(params): + config_default = params['config_default'] + config_extension = params['config_extension'] + user_instruction = params['user_instruction'] + user_selection = params['user_selection'] + command_type = params['command_type'] + + user_prompt, role_prompt, role_config = parse_prompt_and_role_config(user_instruction, command_type) + final_config = merge_deep([config_default, config_extension, role_config]) + selection_boundary = final_config['options']['selection_boundary'] + prompt = make_prompt(role_prompt, user_prompt, user_selection, selection_boundary) + + return { + 'config': final_config, + 'prompt': prompt, + } diff --git a/py/roles.py b/py/roles.py index b7d19e1..16aa4e9 100644 --- a/py/roles.py +++ b/py/roles.py @@ -1,23 +1,17 @@ import vim -# import utils -plugin_root = vim.eval("s:plugin_root") -vim.command(f"py3file {plugin_root}/py/utils.py") +roles_py_imported = True -roles_config_path = os.path.expanduser(vim.eval("g:vim_ai_roles_config_file")) -if not os.path.exists(roles_config_path): - raise Exception(f"Role config file does not exist: {roles_config_path}") +def load_ai_role_names(): + roles_config_path = os.path.expanduser(vim.eval("g:vim_ai_roles_config_file")) + if not os.path.exists(roles_config_path): + raise Exception(f"Role config file does not exist: {roles_config_path}") -roles = configparser.ConfigParser() -roles.read(roles_config_path) + roles = configparser.ConfigParser() + roles.read(roles_config_path) -enhance_roles_with_custom_function(roles) + enhance_roles_with_custom_function(roles) -role_names = [name for name in roles.sections() if not '.' in name] + role_names = [name for name in roles.sections() if not '.' in name] -role_list = [f'"{name}"' for name in role_names] -role_list = ", ".join(role_list) - -role_list = f"[{role_list}]" - -vim.command(f'let l:role_list = {role_list}') + return role_names diff --git a/py/utils.py b/py/utils.py index ae32bc2..ce95ed6 100644 --- a/py/utils.py +++ b/py/utils.py @@ -13,12 +13,17 @@ from urllib.error import HTTPError import traceback import configparser -is_debugging = vim.eval("g:vim_ai_debug") == "1" -debug_log_file = vim.eval("g:vim_ai_debug_log_file") +utils_py_imported = True + +def is_ai_debugging(): + return vim.eval("g:vim_ai_debug") == "1" class KnownError(Exception): pass +def unwrap(input_var): + return vim.eval(input_var) + def load_api_key(config_token_file_path): # token precedence: config file path, global file path, env variable global_token_file_path = vim.eval("g:vim_ai_token_file_path") @@ -72,32 +77,19 @@ def make_http_options(options): 'token_file_path': options['token_file_path'], } -# During text manipulation in Vim's visual mode, we utilize "normal! c" command. This command deletes the highlighted text, -# immediately followed by entering insert mode where it generates desirable text. - -# Normally, Vim contemplates the position of the first character in selection to decide whether to place the entered text -# before or after the cursor. For instance, if the given line is "abcd", and "abc" is selected for deletion and "1234" is -# written in its place, the result is as expected "1234d" rather than "d1234". However, if "bc" is chosen for deletion, the -# achieved output is "a1234d", whereas "1234ad" is not. - -# Despite this, post Vim script's execution of "normal! c", it takes an exit immediately returning to the normal mode. This -# might trigger a potential misalignment issue especially when the most extreme left character is the line’s second character. - -# To avoid such pitfalls, the method "need_insert_before_cursor" checks not only the selection status, but also the character -# at the first position of the highlighting. If the selection is off or the first position is not the second character in the line, -# it determines no need for prefixing the cursor. -def need_insert_before_cursor(is_selection): - if is_selection == False: - return False +# when running AIEdit on selection and cursor ends on the first column, it needs to +# be decided whether to append (a) or insert (i) to prevent missalignment. +# Example: helloxxxhhhvb:AIE translate - expected Holaxxx, not xHolaxx +def need_insert_before_cursor(): pos = vim.eval("getpos(\"'<\")[1:2]") if not isinstance(pos, list) or len(pos) != 2: raise ValueError("Unexpected getpos value, it should be a list with two elements") return pos[1] == "1" # determines if visual selection starts on the first window column -def render_text_chunks(chunks, is_selection): +def render_text_chunks(chunks): generating_text = False full_text = '' - insert_before_cursor = need_insert_before_cursor(is_selection) + insert_before_cursor = need_insert_before_cursor() for text in chunks: if not generating_text: text = text.lstrip() # trim newlines from the beginning @@ -200,10 +192,10 @@ def vim_break_undo_sequence(): # breaks undo sequence (https://vi.stackexchange.com/a/29087) vim.command("let &ul=&ul") -def printDebug(text, *args): - if not is_debugging: +def print_debug(text, *args): + if not is_ai_debugging(): return - with open(debug_log_file, "a") as file: + with open(vim.eval("g:vim_ai_debug_log_file"), "a") as file: message = text.format(*args) if len(args) else text file.write(f"[{datetime.datetime.now()}] " + message + "\n") @@ -301,7 +293,7 @@ def make_chat_text_chunks(messages, config_options): 'messages': messages, **openai_options } - printDebug("[engine-chat] request: {}", request) + print_debug("[engine-chat] request: {}", request) url = config_options['endpoint_url'] response = openai_request(url, request, http_options) @@ -315,11 +307,11 @@ def make_chat_text_chunks(messages, config_options): return choices def map_chunk_no_stream(resp): - printDebug("[engine-chat] response: {}", resp) + print_debug("[engine-chat] response: {}", resp) return _choices(resp)[0].get('message', {}).get('content', '') def map_chunk_stream(resp): - printDebug("[engine-chat] response: {}", resp) + print_debug("[engine-chat] response: {}", resp) return _choices(resp)[0].get('delta', {}).get('content', '') map_chunk = map_chunk_stream if openai_options['stream'] else map_chunk_no_stream -- cgit v1.2.3