1
0
forked from clan/clan-core

matrix-bot: Working openai integration

This commit is contained in:
Luis Hebendanz 2024-07-02 15:28:48 +02:00
parent 7a888fbbae
commit f6e77f3c1b
7 changed files with 334 additions and 26 deletions

View File

@ -2,12 +2,20 @@
python3,
setuptools,
matrix-nio,
aiofiles,
aiohttp,
markdown2,
...
}:
let
pythonDependencies = [ matrix-nio ];
pythonDependencies = [
matrix-nio
aiofiles
aiohttp
markdown2
];
runtimeDependencies = [ ];

View File

@ -13,6 +13,7 @@ from matrix_bot.matrix import MatrixData
log = logging.getLogger(__name__)
curr_dir = Path(__file__).parent
data_dir = curr_dir / "data"
def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
@ -70,6 +71,13 @@ def create_parser(prog: str | None = None) -> argparse.ArgumentParser:
default="https://git.clan.lol",
)
parser.add_argument(
"--data-dir",
help="The directory to store data",
default=data_dir,
type=Path,
)
return parser
@ -102,8 +110,10 @@ def main() -> None:
access_token=os.getenv("GITEA_ACCESS_TOKEN"),
)
args.data_dir.mkdir(parents=True, exist_ok=True)
try:
asyncio.run(bot_main(matrix, gitea))
asyncio.run(bot_main(matrix, gitea, args.data_dir))
except KeyboardInterrupt:
print("User Interrupt", file=sys.stderr)

View File

@ -0,0 +1,148 @@
import asyncio
import datetime
import logging
import subprocess
from pathlib import Path
import json
import aiohttp
from nio import (
AsyncClient,
JoinResponse,
)
from matrix_bot.gitea import (
GiteaData,
)
from .matrix import MatrixData, send_message
from .openai import create_jsonl_file, upload_and_process_file
log = logging.getLogger(__name__)
def write_file_with_date_prefix(content: str, directory: Path, suffix: str) -> Path:
"""
Write content to a file with the current date as filename prefix.
:param content: The content to write to the file.
:param directory: The directory where the file will be saved.
:return: The path to the created file.
"""
# Ensure the directory exists
directory.mkdir(parents=True, exist_ok=True)
# Get the current date
current_date = datetime.datetime.now().strftime("%Y-%m-%d")
# Create the filename
filename = f"{current_date}_{suffix}.txt"
file_path = directory / filename
# Write the content to the file
with open(file_path, "w") as file:
file.write(content)
return file_path
async def git_pull(repo_path: Path) -> None:
cmd = ["git", "pull"]
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=str(repo_path),
)
await process.wait()
async def git_log(repo_path: str) -> str:
cmd = [
"git",
"log",
"--since=1 week ago",
"--pretty=format:%h - %an, %ar : %s",
"--stat",
"--patch",
]
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=repo_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise Exception(
f"Command '{' '.join(cmd)}' failed with exit code {process.returncode}"
)
return stdout.decode()
async def changelog_bot(
client: AsyncClient,
http: aiohttp.ClientSession,
matrix: MatrixData,
gitea: GiteaData,
data_dir: Path,
) -> None:
# If you made a new room and haven't joined as that user, you can use
room: JoinResponse = await client.join(matrix.room)
if not room.transport_response.ok:
log.error("This can happen if the room doesn't exist or the bot isn't invited")
raise Exception(f"Failed to join room {room}")
repo_path = data_dir / gitea.repo
if not repo_path.exists():
cmd = [
"git",
"clone",
f"{gitea.url}/{gitea.owner}/{gitea.repo}.git",
gitea.repo,
]
subprocess.run(cmd, cwd=data_dir, check=True)
# git pull
await git_pull(repo_path)
# git log
diff = await git_log(repo_path)
system_prompt = """
Generate a concise changelog for the past week,
focusing only on new features and summarizing bug fixes into a single entry.
Ensure the following:
- Deduplicate entries
- Discard uninteresting changes
- Keep the summary as brief as possible
- Use present tense
The changelog is as follows:
---
"""
jsonl_path = data_dir / "changelog.jsonl"
# Step 1: Create the JSONL file
await create_jsonl_file(
user_prompt=diff, system_prompt=system_prompt, jsonl_path=jsonl_path
)
# Step 2: Upload the JSONL file and process it
results = await upload_and_process_file(session=http, jsonl_path=jsonl_path)
result_file = write_file_with_date_prefix(json.dumps(results, indent=4), data_dir, "result")
log.info(f"LLM result written to: {result_file}")
# Join all changelogs with a separator (e.g., two newlines)
all_changelogs = []
for result in results:
choices = result["response"]["body"]["choices"]
changelog = "\n".join(choice["message"]["content"] for choice in choices)
all_changelogs.append(changelog)
full_changelog = "\n\n".join(all_changelogs)
await send_message(client, room, full_changelog)

View File

@ -1,24 +1,25 @@
import asyncio
import logging
from pathlib import Path
import aiohttp
from matrix_bot.gitea import GiteaData
from matrix_bot.matrix import MatrixData
log = logging.getLogger(__name__)
curr_dir = Path(__file__).parent
from nio import AsyncClient, ClientConfig, ProfileGetAvatarResponse, RoomMessageText
from matrix_bot.bot import bot_run, message_callback
from matrix_bot.matrix import set_avatar, upload_image
from .changelog_bot import changelog_bot
from .gitea import GiteaData
from .matrix import MatrixData, set_avatar, upload_image
from .review_bot import message_callback, review_requested_bot
async def bot_main(
matrix: MatrixData,
gitea: GiteaData,
data_dir: Path,
) -> None:
# Setup client configuration to handle encryption
client_config = ClientConfig(
@ -41,7 +42,10 @@ async def bot_main(
try:
async with aiohttp.ClientSession() as session:
await bot_run(client, session, matrix, gitea)
while True:
await changelog_bot(client, session, matrix, gitea, data_dir)
await review_requested_bot(client, session, matrix, gitea, data_dir)
await asyncio.sleep(60 * 5)
except Exception as e:
log.exception(e)
finally:

View File

@ -2,9 +2,9 @@ import logging
from pathlib import Path
log = logging.getLogger(__name__)
from dataclasses import dataclass
from markdown2 import markdown
from nio import (
AsyncClient,
JoinedMembersResponse,
@ -32,9 +32,6 @@ async def set_avatar(client: AsyncClient, mxc_url: str) -> None:
raise Exception(f"Failed to set avatar {response}")
from nio import AsyncClient
async def get_room_members(client: AsyncClient, room: JoinResponse) -> list[RoomMember]:
users: JoinedMembersResponse = await client.joined_members(room.room_id)
@ -52,7 +49,9 @@ async def send_message(
"""
Send a message in a Matrix room, optionally mentioning users.
"""
# If user_ids are provided, format the message to mention them
formatted_message = markdown(message)
if user_ids:
mention_list = ", ".join(
[
@ -60,17 +59,13 @@ async def send_message(
for user_id in user_ids
]
)
body = f"{mention_list}: {message}"
formatted_body = f"{mention_list}: {message}"
else:
body = message
formatted_body = message
formatted_message = f"{mention_list}: {formatted_message}"
content = {
"msgtype": "m.text",
"msgtype": "m.notice",
"format": "org.matrix.custom.html",
"body": body,
"formatted_body": formatted_body,
"body": message,
"formatted_body": formatted_message,
}
res: RoomSendResponse = await client.room_send(

View File

@ -0,0 +1,144 @@
import asyncio
import json
import logging
import os
from pathlib import Path
import aiohttp
log = logging.getLogger(__name__)
# The URL to which the request is sent
url: str = "https://api.openai.com/v1/chat/completions"
def api_key() -> str:
if "OPENAI_API_KEY" not in os.environ:
raise Exception("OPENAI_API_KEY not set. Please set it in your environment.")
return os.environ["OPENAI_API_KEY"]
from typing import Any
import aiofiles
async def create_jsonl_file(
*,
user_prompt: str,
system_prompt: str,
jsonl_path: Path,
model: str = "gpt-4o",
max_tokens: int = 1000,
) -> None:
"""
Read the content of a file and create a JSONL file with a request to summarize the content.
:param jsonl_path: The path where the JSONL file will be saved.
:param model: The model to use for summarization.
:param max_tokens: The maximum number of tokens for the summary.
"""
summary_request = {
"custom_id": "request-1",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"max_tokens": max_tokens,
},
}
async with aiofiles.open(jsonl_path, "w") as f:
await f.write(json.dumps(summary_request) + "\n")
async def upload_and_process_file(
*, session: aiohttp.ClientSession, jsonl_path: Path, api_key: str = api_key()
) -> dict[str, Any]:
"""
Upload a JSONL file to OpenAI's Batch API and process it asynchronously.
:param session: An aiohttp.ClientSession object.
:param jsonl_path: The path of the JSONL file to upload.
:param api_key: OpenAI API key for authentication.
:return: The response from the Batch API.
"""
# Step 1: Upload the JSONL file to OpenAI's Files API
async with aiofiles.open(jsonl_path, "rb") as f:
file_data = await f.read()
upload_url = "https://api.openai.com/v1/files"
headers = {
"Authorization": f"Bearer {api_key}",
}
data = aiohttp.FormData()
data.add_field(
"file", file_data, filename=jsonl_path.name, content_type="application/jsonl"
)
data.add_field("purpose", "batch")
async with session.post(upload_url, headers=headers, data=data) as response:
if response.status != 200:
raise Exception(f"File upload failed with status code {response.status}")
upload_response = await response.json()
file_id = upload_response.get("id")
if not file_id:
raise Exception("File ID not returned from upload")
# Step 2: Create a batch using the uploaded file ID
batch_url = "https://api.openai.com/v1/batches"
batch_data = {
"input_file_id": file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
}
async with session.post(batch_url, headers=headers, json=batch_data) as response:
if response.status != 200:
raise Exception(f"Batch creation failed with status code {response.status}")
batch_response = await response.json()
batch_id = batch_response.get("id")
if not batch_id:
raise Exception("Batch ID not returned from creation")
# Step 3: Check the status of the batch until completion
status_url = f"https://api.openai.com/v1/batches/{batch_id}"
while True:
async with session.get(status_url, headers=headers) as response:
if response.status != 200:
raise Exception(
f"Failed to check batch status with status code {response.status}"
)
status_response = await response.json()
status = status_response.get("status")
if status in ["completed", "failed", "expired"]:
break
await asyncio.sleep(10) # Wait before checking again
if status != "completed":
raise Exception(f"Batch processing failed with status: {status}")
# Step 4: Retrieve the results
output_file_id = status_response.get("output_file_id")
output_url = f"https://api.openai.com/v1/files/{output_file_id}/content"
async with session.get(output_url, headers=headers) as response:
if response.status != 200:
raise Exception(
f"Failed to retrieve batch results with status code {response.status}"
)
# Read content as text
content = await response.text()
# Parse the content as JSONL
results = [json.loads(line) for line in content.splitlines()]
return results

View File

@ -29,11 +29,12 @@ async def message_callback(room: MatrixRoom, event: RoomMessageText) -> None:
)
async def bot_run(
async def review_requested_bot(
client: AsyncClient,
http: aiohttp.ClientSession,
matrix: MatrixData,
gitea: GiteaData,
data_dir: Path,
) -> None:
# If you made a new room and haven't joined as that user, you can use
room: JoinResponse = await client.join(matrix.room)
@ -50,7 +51,7 @@ async def bot_run(
pulls = await fetch_pull_requests(gitea, http, limit=50, state=PullState.ALL)
# Read the last updated pull request
last_updated_path = Path("last_updated.json")
last_updated_path = data_dir / "last_updated.json"
last_updated = read_locked_file(last_updated_path)
# Check if the pull request is mergeable and needs review
@ -79,12 +80,10 @@ async def bot_run(
)
await send_message(client, room, message, user_ids=ping_users)
# Write the new last updated pull request
write_locked_file(last_updated_path, last_updated)
# Write the new last updated pull request
write_locked_file(last_updated_path, last_updated)
# Time taken
tend = time.time()
tdiff = round(tend - tstart)
log.debug(f"Time taken: {tdiff}s")
# await client.sync_forever(timeout=30000) # milliseconds