forge-moderation-script/moderate.py

179 lines
5.4 KiB
Python
Executable File

#!/usr/bin/env python3
import datetime
import logging
import os
import pathlib
import sys
import requests
import yaml
# Load config file if it exists, or set as an empty dict
config_file = pathlib.Path(__file__).parent.joinpath("config.yml")
if pathlib.Path.exists(config_file):
config = yaml.safe_load(open(config_file).read())
else:
config = dict()
# set variables from environment or config
log_level = os.environ.get("MOD_LOG_LEVEL", config.get("log_level", "INFO"))
base_url = os.environ.get("MOD_BASE_URL", config.get("base_url"))
api_token = os.environ.get('MOD_API_TOKEN', config.get("api_token"))
orgs= os.environ.get("MOD_ORGS", config.get("orgs", list()))
repos= os.environ.get("MOD_REPOS", config.get("repos", list()))
block_users = os.environ.get("MOD_BLOCK_USERS", config.get("block_users", list()))
history = os.environ.get("MOD_HISTORY", config.get("history", dict(days=1)))
logging.basicConfig(format="[%(levelname)s] %(message)s", level=log_level)
logging.debug(f"log_level = {log_level}")
# Run checks on variables and make necessary changes
if isinstance(base_url, str):
base_url = f"{base_url.strip('/')}/api/v1"
else:
logging.critical("base_url not set!")
sys.exit(1)
logging.debug(f"base_url = {base_url}")
if api_token is None:
logging.critical("api_token not set!")
sys.exit(1)
if isinstance(orgs, str):
orgs = orgs.replace(" ", "").split(",")
logging.debug(f"orgs = {orgs}")
if isinstance(repos, str):
repos = repos.replace(" ", "").split(",")
logging.debug(f"repos = {repos}")
if isinstance(block_users, str):
block_users = block_users.replace(" ", "").split(",")
for i, entry in enumerate(block_users):
if ":" in entry:
block_users[i] = [ entry.split(":")[0], ":".join(entry.split(":")[1:]) ]
else:
block_users[i] = [ entry ]
logging.debug(f"block_users = {block_users}")
if isinstance(history, str):
history = dict((k.strip(), int(v.strip())) for k, v in
(item.split(":") for item in history.split(",")))
logging.debug(f"history = {history}")
s = requests.Session()
s.headers.update(Authorization=f"token {api_token}")
dt_hist = datetime.datetime.utcnow() - datetime.timedelta(**history)
dt_hist = f"{dt_hist.isoformat()}Z"
# get list of repos to moderate
repolist = list()
for org in orgs:
resp = s.get(f"{base_url}/orgs/{org}/repos")
if resp.ok:
for repo in resp.json():
repolist.append(repo)
logging.info(f"Watching repo {repo.get('full_name')}")
else:
logging.warning(f"Error getting org {org} ({resp.status_code} {resp.reason})")
for repo in repos:
resp = s.get(f"{base_url}/repo/{repo}")
if resp.ok:
repolist.append(resp.json())
logging.info(f"Watching repo {resp.json().get('full_name')}")
else:
logging.warning(f"Error getting repo {repo} ({resp.status_code} {resp.reason})")
if len(repolist) == 0:
logging.info("No repos to check. Exiting!")
sys.exit(0)
# list of users to moderate
userlist = list()
for user in block_users:
if len(user) > 1:
if datetime.datetime.fromisoformat(user[1]) < datetime.datetime.utcnow():
logging.warning(f"Block has expired for {user[0]}, skipping.")
continue
resp = s.get(f"{base_url}/users/{user[0]}")
if resp.ok:
userlist.append(resp.json())
logging.info(f"Blocking user {user[0]}.")
else:
logging.warning(f"Error getting user {user[0]} ({resp.status_code} {resp.reason})")
if len(userlist) == 0:
logging.info("No users to block. Exiting!")
sys.exit(0)
# remove issues created by blocked users
logging.info(f"Searching for issues and PRs created by blocked users.")
for repo in repolist:
repostr = f"/repos/{repo.get('full_name')}/issues"
for user in userlist:
data = dict(state="all", created_by=user.get("username"), since=dt_hist)
params = "&".join(f"{k}={v}" for k, v in data.items())
resp = s.get(f"{base_url}{repostr}?{params}")
if resp.ok:
for issue in resp.json():
logging.info(f"Found issue created by {user.get('username')}!")
logging.info(f"Deleting issue {issue.get('html_url')}")
dresp = s.delete(issue.get("url"))
if not dresp.ok:
logging.error(f"Error deleting {issue.get('html_url')} ({dresp.status_code} {dresp.reason})")
else:
logging.warning(f"{resp.url} {resp.status_code} {resp.reason}")
# remove comments added by moderated users
logging.info(f"Searching for comments created by blocked users.")
for repo in repolist:
commentstr = f"/repos/{repo.get('full_name')}/issues/comments"
for user in userlist:
resp = s.get(f"{base_url}{commentstr}?since={dt_hist}")
if resp.ok:
for comment in resp.json():
commentuser = comment.get("user").get("username")
if commentuser in [user.get("username") for user in userlist]:
logging.info(f"Found comment from {commentuser}!")
logging.info(f"Deleting comment {comment.get('html_url')}")
dresp = s.delete(f"{base_url}{commentstr}/{comment.get('id')}")
if not dresp.ok:
logging.error(f"Error deleting {comment.get('html_url')} ({dresp.status_code} {dresp.reason})")
else:
logging.warning(f"{resp.url} {resp.status_code} {resp.reason}")