1
0
forked from clan/clan-infra

Merge pull request 'matrix-bot: Trigger review bot on labels and assignees. Fix changelog bot dropping data' (#219) from Qubasa/clan-infra:Qubasa-main into main

This commit is contained in:
clan-bot 2024-07-17 16:46:30 +00:00
commit fef5bada95
3 changed files with 227 additions and 136 deletions

View File

@ -18,7 +18,7 @@ from matrix_bot.gitea import (
from .locked_open import read_locked_file, write_locked_file
from .matrix import MatrixData, send_message
from .openai import create_jsonl_data, upload_and_process_file
from .openai import create_jsonl_data, upload_and_process_files
log = logging.getLogger(__name__)
@ -173,22 +173,20 @@ Create a concise changelog
Follow these guidelines:
- Keep the summary brief
- Follow commit message format: "scope: message (#number1, #number2)"
- Follow the pull request format: "scope: message (#number1, #number2)"
- Don't use the commit messages tied to a pull request as is and instead explain the change in a user-friendly way
- Link pull requests as: '{gitea.url}/{gitea.owner}/{gitea.repo}/pulls/<number>'
- Use markdown links to make the pull request number clickable
- Mention each pull request number at most once
- Focus on the most interesting changes for end users
- Explain the impact of the changes in a user-friendly way
- Focus on new clan modules if any
- Always use four '#' for headings never less than that. Example: `####New Features`
---
Example Changelog:
### Changelog:
#### Changelog:
For the last {matrix.changelog_frequency} days from {fromdate} to {todate}
#### New Features
- `secrets`: added settings and generator submodules, improved tests [#1679]({gitea.url}/{gitea.owner}/{gitea.repo}/pulls/1679)
> Users can now generate secrets and manage settings in the new submodules
- `sshd`: added a workaround for CVE-2024-6387 [#1674]({gitea.url}/{gitea.owner}/{gitea.repo}/pulls/1674)
> A workaround has been added to mitigate the security vulnerability
- `secrets`: Users can now generate secrets and manage settings in the new submodules [#1679]({gitea.url}/{gitea.owner}/{gitea.repo}/pulls/1679)
- `sshd`: A workaround has been added to mitigate the security vulnerability [#1674]({gitea.url}/{gitea.owner}/{gitea.repo}/pulls/1674)
...
#### Refactoring
...
@ -196,27 +194,19 @@ For the last {matrix.changelog_frequency} days from {fromdate} to {todate}
...
#### Bug Fixes
...
#### Other Changes
#### Additional Notes
...
---
### Changelog:
#### Changelog:
For the last {matrix.changelog_frequency} days from {fromdate} to {todate}
#### New Features
"""
# Step 1: Create the JSONL file
jsonl_data = await create_jsonl_data(user_prompt=diff, system_prompt=system_prompt)
jsonl_files = await create_jsonl_data(user_prompt=diff, system_prompt=system_prompt)
# Step 2: Upload the JSONL file and process it
results = await upload_and_process_file(session=http, jsonl_data=jsonl_data)
# Write the results to a file in the changelogs directory
result_file = write_file_with_date_prefix(
json.dumps(results, indent=4),
data_dir / "changelogs",
ndays=matrix.changelog_frequency,
suffix="result",
)
log.info(f"LLM result written to: {result_file}")
results = await upload_and_process_files(session=http, jsonl_files=jsonl_files)
# Join responses together
all_changelogs = []
@ -226,6 +216,57 @@ For the last {matrix.changelog_frequency} days from {fromdate} to {todate}
all_changelogs.append(changelog)
full_changelog = "\n\n".join(all_changelogs)
log.info(f"Changelog generated:\n{full_changelog}")
combine_prompt = """
Please combine the following changelogs into a single markdown changelog.
- Merge the sections and remove any duplicates.
- Make sure the changelog is concise and easy to read.
---
Example Changelog:
#### Changelog:
For the last {matrix.changelog_frequency} days from {fromdate} to {todate}
#### New Features
- **inventory**:
- Initial support for deployment info for machines [#1767](https://git.clan.lol/clan/clan-core/pulls/1767)
- Automatic inventory schema checks and runtime assertions [#1753](https://git.clan.lol/clan/clan-core/pulls/1753)
- **webview**:
- Introduced block devices view and machine flashing UI [#1745](https://git.clan.lol/clan/clan-core/pulls/1745), [#1768](https://git.clan.lol/clan/clan-core/pulls/1768)
- Migration to solid-query for improved resource fetching & caching [#1755](https://git.clan.lol/clan/clan-core/pulls/1755)
...
#### Refactoring
...
#### Documentation
...
#### Bug Fixes
...
#### Additional Notes
...
---
#### Changelog:
For the last {matrix.changelog_frequency} days from {fromdate} to {todate}
#### New Features
"""
new_jsonl_files = await create_jsonl_data(
user_prompt=full_changelog, system_prompt=combine_prompt
)
new_results = await upload_and_process_files(
session=http, jsonl_files=new_jsonl_files
)
await send_message(client, room, full_changelog)
new_all_changelogs = []
for result in new_results:
choices = result["response"]["body"]["choices"]
changelog = "\n".join(choice["message"]["content"] for choice in choices)
new_all_changelogs.append(changelog)
new_full_changelog = "\n\n".join(new_all_changelogs)
log.info(f"Changelog generated:\n{new_full_changelog}")
# Write the results to a file in the changelogs directory
new_result_file = write_file_with_date_prefix(
json.dumps(new_results, indent=4),
data_dir / "changelogs",
ndays=matrix.changelog_frequency,
suffix="result",
)
log.info(f"LLM result written to: {new_result_file}")
await send_message(client, room, new_full_changelog)

View File

@ -30,129 +30,160 @@ async def create_jsonl_data(
user_prompt: str,
system_prompt: str,
model: str = "gpt-4o",
max_tokens: int = 4096,
) -> bytes:
summary_request: dict[str, Any] = {
"custom_id": "request-1",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"max_tokens": max_tokens,
},
}
dumped = json.dumps(summary_request)
encoder = tiktoken.encoding_for_model(model)
count_tokens: int = len(encoder.encode(dumped))
used_tokens = max_tokens + count_tokens + 1000
log.debug(f"Number of tokens in the JSONL data: {used_tokens}")
if used_tokens > 128_000:
# Cut off the excess tokens
tokens_to_remove: int = used_tokens - 128_000
message = summary_request["body"]["messages"][1]
content = message["content"]
max_response_tokens: int = 4096,
) -> list[bytes]:
def split_message(content: str, max_tokens: int) -> list[str]:
# Split the content into chunks of max_tokens
content_tokens = encoder.encode(content)
chunks = []
for i in range(0, len(content_tokens), max_tokens):
chunk = content_tokens[i : i + max_tokens]
chunks.append(encoder.decode(chunk))
log.debug(f"Chunk {i/max_tokens}: {len(chunk)} tokens")
return chunks
if len(content_tokens) > tokens_to_remove:
# Remove the excess tokens
encoded_content = content_tokens[:-tokens_to_remove]
log.debug(f"Removed {tokens_to_remove} tokens from the content")
# Decode the tokens back to string
content = encoder.decode(encoded_content)
summary_request["body"]["messages"][1]["content"] = content
encoder = tiktoken.encoding_for_model(model)
max_message_tokens = 127_000 - max_response_tokens
dumped = json.dumps(summary_request)
else:
raise Exception("Not enough tokens to remove")
# Split user_prompt into multiple user messages if it exceeds the max_message_tokens
user_messages = []
for message_chunk in split_message(user_prompt, max_message_tokens):
if len(message_chunk) == 0:
raise Exception("Empty message chunk")
user_messages.append({"role": "user", "content": message_chunk})
new_count_tokens: int = len(encoder.encode(dumped))
if new_count_tokens > 128_000:
raise Exception(f"Too many tokens in the JSONL data {new_count_tokens}")
## count number of tokens for every user message
count_tokens: int = 0
for i, message in enumerate(user_messages):
count_tokens = len(encoder.encode(message["content"]))
log.debug(f"Number of tokens in the user messages: {count_tokens}")
if count_tokens > max_message_tokens:
raise Exception(f"Too many tokens in the user message[{i}] {count_tokens}")
return dumped.encode("utf-8")
batch_jobs: list[bytes] = []
for message in user_messages:
summary_request: dict[str, Any] = {
"custom_id": "request-1",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
message,
],
"max_tokens": max_response_tokens,
},
}
dumped = json.dumps(summary_request)
batch_jobs.append(dumped.encode("utf-8"))
return batch_jobs
async def upload_and_process_file(
*, session: aiohttp.ClientSession, jsonl_data: bytes, api_key: str = api_key()
async def upload_and_process_files(
*,
session: aiohttp.ClientSession,
jsonl_files: list[bytes],
api_key: str = api_key(),
) -> list[dict[str, Any]]:
"""
Upload a JSONL file to OpenAI's Batch API and process it asynchronously.
Upload multiple JSONL files to OpenAI's Batch API and process them asynchronously.
"""
upload_url = "https://api.openai.com/v1/files"
headers = {
"Authorization": f"Bearer {api_key}",
}
data = aiohttp.FormData()
data.add_field(
"file", jsonl_data, filename="changelog.jsonl", content_type="application/jsonl"
)
data.add_field("purpose", "batch")
async with session.post(upload_url, headers=headers, data=data) as response:
if response.status != 200:
raise Exception(f"File upload failed with status code {response.status}")
upload_response = await response.json()
file_id = upload_response.get("id")
async def upload_file(jsonl_data: bytes) -> str:
upload_url = "https://api.openai.com/v1/files"
if not file_id:
raise Exception("File ID not returned from upload")
data = aiohttp.FormData()
data.add_field(
"file",
jsonl_data,
filename="changelog.jsonl",
content_type="application/jsonl",
)
data.add_field("purpose", "batch")
# Step 2: Create a batch using the uploaded file ID
batch_url = "https://api.openai.com/v1/batches"
batch_data = {
"input_file_id": file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
}
async with session.post(batch_url, headers=headers, json=batch_data) as response:
if response.status != 200:
raise Exception(f"Batch creation failed with status code {response.status}")
batch_response = await response.json()
batch_id = batch_response.get("id")
if not batch_id:
raise Exception("Batch ID not returned from creation")
# Step 3: Check the status of the batch until completion
status_url = f"https://api.openai.com/v1/batches/{batch_id}"
while True:
async with session.get(status_url, headers=headers) as response:
async with session.post(upload_url, headers=headers, data=data) as response:
if response.status != 200:
raise Exception(
f"Failed to check batch status with status code {response.status}"
f"File upload failed with status code {response.status}"
)
status_response = await response.json()
status = status_response.get("status")
if status in ["completed", "failed", "expired"]:
break
await asyncio.sleep(10) # Wait before checking again
upload_response = await response.json()
file_id = upload_response.get("id")
if status != "completed":
raise Exception(f"Batch processing failed with status: {status}")
if not file_id:
raise Exception("File ID not returned from upload")
# Step 4: Retrieve the results
output_file_id = status_response.get("output_file_id")
output_url = f"https://api.openai.com/v1/files/{output_file_id}/content"
return file_id
async with session.get(output_url, headers=headers) as response:
if response.status != 200:
raise Exception(
f"Failed to retrieve batch results with status code {response.status} reason {response.reason}"
)
async def create_batch(file_id: str) -> str:
batch_url = "https://api.openai.com/v1/batches"
batch_data = {
"input_file_id": file_id,
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
}
# Read content as text
content = await response.text()
async with session.post(
batch_url, headers=headers, json=batch_data
) as response:
if response.status != 200:
raise Exception(
f"Batch creation failed with status code {response.status}"
)
batch_response = await response.json()
batch_id = batch_response.get("id")
# Parse the content as JSONL
results = [json.loads(line) for line in content.splitlines()]
return results
if not batch_id:
raise Exception("Batch ID not returned from creation")
return batch_id
async def check_batch_status(batch_id: str) -> str:
status_url = f"https://api.openai.com/v1/batches/{batch_id}"
while True:
async with session.get(status_url, headers=headers) as response:
if response.status != 200:
raise Exception(
f"Failed to check batch status with status code {response.status}"
)
status_response = await response.json()
status = status_response.get("status")
if status in ["completed", "failed", "expired"]:
if status != "completed":
raise Exception(
f"Batch processing failed with status: {status}"
)
return status_response.get("output_file_id")
await asyncio.sleep(10)
async def retrieve_results(output_file_id: str) -> list[dict[str, Any]]:
output_url = f"https://api.openai.com/v1/files/{output_file_id}/content"
async with session.get(output_url, headers=headers) as response:
if response.status != 200:
raise Exception(
f"Failed to retrieve batch results with status code {response.status} reason {response.reason}"
)
content = await response.text()
results = [json.loads(line) for line in content.splitlines()]
return results
file_ids = await asyncio.gather(
*[upload_file(jsonl_data) for jsonl_data in jsonl_files]
)
batch_ids = await asyncio.gather(*[create_batch(file_id) for file_id in file_ids])
output_file_ids = await asyncio.gather(
*[check_batch_status(batch_id) for batch_id in batch_ids]
)
all_results = await asyncio.gather(
*[retrieve_results(output_file_id) for output_file_id in output_file_ids]
)
# Flatten the list of results
combined_results = [item for sublist in all_results for item in sublist]
return combined_results

View File

@ -68,30 +68,49 @@ async def review_requested_bot(
# and if the pull request is newer than the last updated pull request
for pull in pulls:
requested_reviewers = pull["requested_reviewers"]
pid = str(pull["id"])
if requested_reviewers and pull["mergeable"]:
last_time_updated = ping_hist.get(pid, {}).get(
assigned_users = pull["assignees"]
mentioned_users = []
if assigned_users:
mentioned_users.extend(assigned_users)
if requested_reviewers:
mentioned_users.extend(requested_reviewers)
mentioned_users = list(map(lambda x: x["login"].lower(), mentioned_users))
mentioned_users = list(
filter(lambda name: name not in matrix.user, mentioned_users)
)
pull_id = str(pull["id"])
needs_review_label = any(x["name"] == "needs-review" for x in pull["labels"])
if (
len(mentioned_users) > 0
and pull["mergeable"]
or needs_review_label
and pull["mergeable"]
):
last_time_updated = ping_hist.get(pull_id, {}).get(
"updated_at", datetime.datetime.min.isoformat()
)
if ping_hist == {} or pull["updated_at"] > last_time_updated:
ping_hist[pid] = pull
ping_hist[pull_id] = pull
else:
continue
# Check if the requested reviewers are in the room
requested_reviewers = [r["login"].lower() for r in requested_reviewers]
ping_users = []
for user in users:
if user.display_name.lower() in requested_reviewers:
if user.display_name.lower() in mentioned_users:
ping_users.append(user.user_id)
# Send a message to the room and mention the users
log.info(f"Pull request {pull['title']} needs review")
log.debug(
f"Mentioned users: {mentioned_users}, has needs-review label: {needs_review_label}"
)
message = f"Review Requested:\n[{pull['title']}]({pull['html_url']})"
await send_message(client, room, message, user_ids=ping_users)
# Write the new last updated pull request
write_locked_file(ping_hist_path, ping_hist)
# Write the new last updated pull request
write_locked_file(ping_hist_path, ping_hist)
# Time taken
tend = time.time()