Newer
Older
code_reviewer / main.py
import lmstudio as lms
import re
import os
import shutil
from git import Blob, RemoteReference, Repo, GitCommandError, Tree
import datetime

repo_path = "repos"
reviews_path = "reviews"
model_to_use = "qwen3-32b-mlx" #"qwen/qwen2.5-coder-32b"

model = lms.llm(model_to_use,
    config={
        "contextLength": 32768,
        "gpu": {
        "ratio": 1.0,
        }
})

def recreate_dir(dir_path: str):
    if os.path.isdir(dir_path):
        shutil.rmtree(dir_path)
    elif os.path.isfile(dir_path):
        os.remove(dir_path)
    
    os.makedirs(dir_path, exist_ok=True)
    
def clone_repo_to_temp():
    try:
        repo_url = input("Bitte gib die URL des Git-Repos ein: ").strip()
        
        if not repo_url:
            print("Keine URL eingegeben.")
            return None
        
        recreate_dir(repo_path)
        
        print(f"Klone Repository '{repo_url}' nach {repo_path}...")

        Repo.clone_from(repo_url, repo_path)

        print(f"Repository wurde erfolgreich nach {repo_path} geklont.")
        
    except GitCommandError as e:
        print(f"Fehler beim Klonen des Repositories: {e}")
        shutil.rmtree(repo_path, ignore_errors=True)
        return None
        
    except Exception as e:
        print(f"✗ Ein unerwarteter Fehler ist aufgetreten: {e}")
        shutil.rmtree(repo_path, ignore_errors=True)
        return None

def list_branches() -> str:
    repo = Repo(repo_path)

    repo.git.execute(["git", "fetch", "--all"])

    remote_branches = [branch.name for branch in repo.remote().refs]

    branches = remote_branches[1:]

    print("\nVerfügbare Branches:")

    for i, branch in enumerate(branches):
        print(f"{i+1}. {branch}")

    choice = int(input("Wähle eine Branch aus (Nummer): ")) - 1
    return branches[choice]

def select_commit(branch_name: str) -> str:
    repo = Repo(repo_path)
    
    commits = list(repo.iter_commits(branch_name))
    print(f"\nCommit-Verlauf für {branch_name}:")
    
    commits.reverse()  
    
    for i, commit in enumerate(commits):
        print(f"{i+1}. {commit.hexsha[:7]} - {commit.message.strip()}")
    
    choice = int(input("Wähle einen Commit aus (Nummer): ")) - 1
    return commits[choice].hexsha

def checkout_commit_to_temp(commit_hash: str) -> str:

    try:
        repo = Repo(repo_path)
        branch_name = f"temp-branch-{commit_hash[:7]}"

        repo.create_head(branch_name, commit=commit_hash)
        repo.heads[branch_name].checkout()

    except GitCommandError as e:
        print(f"Fehler beim Checkout: {e}")
        return None

    except Exception as e:
        print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")
        shutil.rmtree(repo_path, ignore_errors=True)
        return None

def collect_files_in_tree(tree: Tree) -> list[str]:
    file_paths = []
    for item in tree:
        if isinstance(item, Blob):
            file_paths.append(str(item.path))
        elif isinstance(item, Tree):
            subtree_files = collect_files_in_tree(item)
            file_paths.extend(subtree_files)
    return file_paths

def get_files_in_commit(commit_hash: str) -> list[str]:
    try:
        repo = Repo(repo_path)
        commit = repo.commit(commit_hash)

        if not commit.parents:
            files = [diff.a_path for diff in commit.diff(None)]
        else:
            files = [diff.a_path for diff in commit.diff(commit.parents[0])]

        return files

    except Exception as e:
        print(f"Fehler beim Abruf der Commit-Dateien: {e}")
        return []

def remove_think_tags(text: str) -> str:
    return re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
    
def get_llm_response(code_content: str) -> str:
    prompts = read_code_file("prompts.prp")
    chat = lms.Chat(prompts)
    chat.add_user_message(code_content)
    response_message = model.respond(chat)
    cleaned_response = remove_think_tags(response_message.content)
    return cleaned_response

def read_gitignore_file() -> list[str]:
    gitignore_files = []
    gitignore_file_name = f"{repo_path}/.gitignore" 
    if os.path.isfile(gitignore_file_name):
        with open(gitignore_file_name, "r") as gitignore_file:
            gitignore_files = gitignore_file.readlines()

    return gitignore_files

def read_code_file(file_path: str, file_name: str = "", gitignore_files: list[str] = []) -> str:
    if not os.path.isfile(file_path):
        print(f"Die Datei {file_path} existiert nicht.")
        return None
    
    _, ext = os.path.splitext(file_path)
    valid_extensions = ['.py', '.js', '.java', '.cpp', '.prp', '.cs', '.html', '.css', '.scss', '.ts', '.json']

    if file_name != "":
        base_file_name = os.path.basename(file_name)
        check_file_name = file_path.replace(repo_path, "")
        check_file_path = check_file_name.replace(base_file_name, "")[:-1]

        for gitignore_file in gitignore_files:
            if gitignore_file.__contains__(file_name) or (len(check_file_path) > 0 and gitignore_file.__contains__(check_file_path)):
                print(f"Datei {file_name} wird ignoriert, da sie Teil der .gitignore ist.")
                return None
    
    if ext.lower() not in valid_extensions:
        print(f"Datei {file_name} hat eine nicht unterstützte Dateiendung {ext}. Unterstützte Formate: {', '.join(valid_extensions)}")
        return None    
    else:
        with open(file_path, 'r') as code_file:
            return code_file.read()

def delete_temp_branches():
    repo = Repo(repo_path)
    
    matching_branches = [
        branch.name for branch in repo.heads 
        if branch.name.startswith("temp-branch-")
    ]
    print(f"Diese Branches: {matching_branches} wurden gelöscht.")

def write_html_file(html_code: str, commit: str, code_filename: str) -> str:
    html_file_name = f"{code_filename.replace(".", "_").replace("/", "_").replace("\\", "_")}.html"
    html_code = html_code.replace("script.py", code_filename)
    with open(f"reviews/{commit}/{html_file_name}", "w") as html_file:
        html_file.write(html_code)

def calc_time_duration(starting: datetime, ending: datetime) -> str:
    elapsed_time = ending - starting
    total_seconds = int(elapsed_time.total_seconds())
    hours = total_seconds // 3600
    remaining_seconds_total = total_seconds % 3600
    minutes = remaining_seconds_total // 60
    seconds = remaining_seconds_total % 60

    return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

def main():
    try:
        clone_repo_to_temp()
        selected_branch = list_branches()
        
        print(f"\nAussuchen von Commits in Branch '{selected_branch}'...")

        commit_hash = select_commit(selected_branch)
        
        print(f"\nCheckout des Commits {commit_hash[:7]}...")
        checkout_commit_to_temp(commit_hash)
        files = get_files_in_commit(commit_hash)
        start_time_commit = datetime.datetime.now()

        print(f"\nBeginne Code Reviews des Commits {commit_hash[:7]} ({start_time_commit})...")
        print(f"\nIn diesem Commit wurden die folgenden Dateien geändert:")
        for file in files:
            print(f"- {file}")
        recreate_dir(f"reviews/{commit_hash}")   
        
        gitignore_files = read_gitignore_file()     

        for file in files:
            try:
                file_to_review = f"{repo_path}/{file}"
                code_content = read_code_file(file_to_review, file, gitignore_files)
                if code_content is None:
                    continue
                start_time_file = datetime.datetime.now()

                print(f"\nReviewe Datei {file} ({start_time_file})...")

                review_result = get_llm_response(code_content)
                write_html_file(review_result, commit_hash, file)
                end_time_file = datetime.datetime.now()
                file_review_duration = calc_time_duration(start_time_file, end_time_file)

                print(f"\nReview der Datei {file} abgeschlossen ({end_time_file}). Dauer: {file_review_duration}")
            except Exception as e:
                print(f"Fehler beim Lesen der Datei: {e}")
        end_time_commit = datetime.datetime.now()
        commit_duration = calc_time_duration(start_time_commit, end_time_commit)

        print(f"\nAbschluss Code Review des Commits {commit_hash[:7]} ({end_time_commit}). Dauer: {commit_duration}")

        delete_temp_branches()

        if os.path.isdir(repo_path):
            shutil.rmtree(repo_path)

    except Exception as e:
        print(f"Fehler: {e}")

if __name__ == "__main__":
    main()