import lmstudio as lms
import re
import os
import shutil
from git import Blob, Repo, GitCommandError, Tree
import datetime
repo_path = "repos"
reviews_path = "reviews"
model_to_use = "qwen3-32b-mlx" #"qwen/qwen2.5-coder-32b"
model = lms.llm(model_to_use,
config={
"contextLength": 32768,
"gpu": {
"ratio": 1.0,
}
})
def recreate_dir(dir_path: str):
if os.path.isdir(dir_path):
shutil.rmtree(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path, exist_ok=True)
def clone_repo_to_temp():
try:
repo_url = input("Bitte gib die URL des Git-Repos ein: ").strip()
if not repo_url:
print("Keine URL eingegeben.")
return None
recreate_dir(repo_path)
print(f"Klone Repository '{repo_url}' nach {repo_path}...")
Repo.clone_from(repo_url, repo_path)
print(f"Repository wurde erfolgreich nach {repo_path} geklont.")
except GitCommandError as e:
print(f"Fehler beim Klonen des Repositories: {e}")
shutil.rmtree(repo_path, ignore_errors=True)
return None
except Exception as e:
print(f"✗ Ein unerwarteter Fehler ist aufgetreten: {e}")
shutil.rmtree(repo_path, ignore_errors=True)
return None
def list_branches() -> list[str]:
repo = Repo(repo_path)
branches = [branch.name for branch in repo.heads]
print("\nVerfügbare Branches:")
for i, branch in enumerate(branches):
print(f"{i+1}. {branch}")
choice = int(input("Wähle eine Branch aus (Nummer): ")) - 1
return branches[choice]
def select_commit(branch_name: str) -> str:
repo = Repo(repo_path)
commits = list(repo.iter_commits(branch_name))
print(f"\nCommit-Verlauf für {branch_name}:")
commits.reverse()
for i, commit in enumerate(commits):
print(f"{i+1}. {commit.hexsha[:7]} - {commit.message.strip()}")
choice = int(input("Wähle einen Commit aus (Nummer): ")) - 1
return commits[choice].hexsha
def checkout_commit_to_temp(commit_hash: str) -> str:
try:
repo = Repo(repo_path)
branch_name = f"temp-branch-{commit_hash[:7]}"
repo.create_head(branch_name, commit=commit_hash)
repo.heads[branch_name].checkout()
except GitCommandError as e:
print(f"Fehler beim Checkout: {e}")
return None
except Exception as e:
print(f"Ein unerwarteter Fehler ist aufgetreten: {e}")
shutil.rmtree(repo_path, ignore_errors=True)
return None
def collect_files_in_tree(tree: Tree) -> list[str]:
file_paths = []
for item in tree:
if isinstance(item, Blob):
file_paths.append(str(item.path))
elif isinstance(item, Tree):
subtree_files = collect_files_in_tree(item)
file_paths.extend(subtree_files)
return file_paths
def get_files_in_commit(commit_hash: str) -> list[str]:
try:
repo = Repo(repo_path)
commit = repo.commit(commit_hash)
files = collect_files_in_tree(commit.tree)
return files
except Exception as e:
print(f"Fehler beim Abruf der Commit-Dateien: {e}")
return []
def remove_think_tags(text: str) -> str:
return re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
def get_llm_response(code_content: str) -> str:
prompts = read_code_file("prompts.prp")
chat = lms.Chat(prompts)
chat.add_user_message(code_content)
response_message = model.respond(chat)
cleaned_response = remove_think_tags(response_message.content)
return cleaned_response
def read_gitignore_file() -> list[str]:
gitignore_files = []
gitignore_file_name = f"{repo_path}/.gitignore"
if os.path.isfile(gitignore_file_name):
with open(gitignore_file_name, "r") as gitignore_file:
gitignore_files = gitignore_file.readlines()
return gitignore_files
def read_code_file(file_path: str, file_name: str = "", gitignore_files: list[str] = []) -> str:
if not os.path.isfile(file_path):
print(f"Die Datei {file_path} existiert nicht.")
return None
_, ext = os.path.splitext(file_path)
valid_extensions = ['.py', '.js', '.java', '.cpp', '.prp', '.cs', '.html', '.css', '.scss', '.ts', '.json']
if file_name != "":
base_file_name = os.path.basename(file_name)
check_file_name = file_path.replace(repo_path, "")
check_file_path = check_file_name.replace(base_file_name, "")[:-1]
for gitignore_file in gitignore_files:
if gitignore_file.__contains__(file_name) or (len(check_file_path) > 0 and gitignore_file.__contains__(check_file_path)):
print(f"Datei {file_name} wird ignoriert, da sie Teil der .gitignore ist.")
return None
if ext.lower() not in valid_extensions:
print(f"Datei {file_name} hat eine nicht unterstützte Dateiendung {ext}. Unterstützte Formate: {', '.join(valid_extensions)}")
return None
else:
with open(file_path, 'r') as code_file:
return code_file.read()
def delete_temp_branches():
repo = Repo(repo_path)
matching_branches = [
branch.name for branch in repo.heads
if branch.name.startswith("temp-branch-")
]
print(f"Diese Branches: {matching_branches} wurden gelöscht.")
def write_html_file(html_code: str, commit: str, code_filename: str) -> str:
html_file_name = f"{code_filename.replace(".", "_")}.html"
html_code = html_code.replace("script.py", code_filename)
with open(f"reviews/{commit}/{html_file_name}", "w") as html_file:
html_file.write(html_code)
def calc_time_duration(starting: datetime, ending: datetime) -> str:
elapsed_time = ending - starting
total_seconds = int(elapsed_time.total_seconds())
hours = total_seconds // 3600
remaining_seconds_total = total_seconds % 3600
minutes = remaining_seconds_total // 60
seconds = remaining_seconds_total % 60
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def main():
try:
clone_repo_to_temp()
selected_branch = list_branches()
print(f"\nAussuchen von Commits in Branch '{selected_branch}'...")
commit_hash = select_commit(selected_branch)
print(f"\nCheckout des Commits {commit_hash[:7]}...")
checkout_commit_to_temp(commit_hash)
files = get_files_in_commit(commit_hash)
start_time_commit = datetime.datetime.now()
print(f"\nBeginne Code Reviews des Commits {commit_hash[:7]} ({start_time_commit})...")
recreate_dir(f"reviews/{commit_hash}")
gitignore_files = read_gitignore_file()
for file in files:
try:
file_to_review = f"{repo_path}/{file}"
code_content = read_code_file(file_to_review, file, gitignore_files)
if code_content is None:
continue
start_time_file = datetime.datetime.now()
print(f"\nReviewe Datei {file} ({start_time_file})...")
review_result = get_llm_response(code_content)
write_html_file(review_result, commit_hash, file)
end_time_file = datetime.datetime.now()
file_review_duration = calc_time_duration(start_time_file, end_time_file)
print(f"\nReview der Datei {file} abgeschlossen ({end_time_file}). Dauer: {file_review_duration}")
except Exception as e:
print(f"Fehler beim Lesen der Datei: {e}")
end_time_commit = datetime.datetime.now()
commit_duration = calc_time_duration(start_time_commit, end_time_commit)
print(f"\nAbschluss Code Review des Commits {commit_hash[:7]} ({end_time_commit}). Dauer: {commit_duration}")
delete_temp_branches()
if os.path.isdir(repo_path):
shutil.rmtree(repo_path)
except Exception as e:
print(f"Fehler: {e}")
if __name__ == "__main__":
main()