import requests import argparse import time from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # 使用方法: python clean-gitea-user.py --keep-users admin user1 --gitea-url http://gitea.example.com --api-token your_token # 解析命令行参数 parser = argparse.ArgumentParser(description='清理 Gitea 用户和仓库,保留指定的用户及其仓库。') parser.add_argument('--keep-users', nargs='+', required=True, help='要保留的用户列表,例如 --keep-users user1 user2') parser.add_argument('--gitea-url', required=True, help='Gitea 服务器的 URL,例如 http://localhost:3000') parser.add_argument('--api-token', required=True, help='Gitea API Token') args = parser.parse_args() # Gitea服务器 URL 和 API 访问 token gitea_url = args.gitea_url api_token = args.api_token # 获取所有用户的API请求 users_api = f'{gitea_url}/api/v1/admin/users' # 获取所有用户的 API URL headers = { 'Authorization': f'token {api_token}' # 设置认证头,使用 API Token } # 创建带有重试机制的会话 session = requests.Session() retry = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) # 删除指定用户的仓库 def delete_user_repositories(username): """ 获取并删除指定用户的所有仓库。 :param username: 用户名 """ # 获取用户的仓库 repos_api = f'{gitea_url}/api/v1/users/{username}/repos' # 用户仓库 API URL response = session.get(repos_api, headers=headers) if response.status_code == 200: # 如果请求成功,遍历该用户的所有仓库 repos = response.json() for repo in repos: repo_name = repo['name'] # 获取仓库的名字 # 删除仓库 delete_repo_api = f'{gitea_url}/api/v1/repos/{username}/{repo_name}' # 删除仓库的 API URL delete_response = session.delete(delete_repo_api, headers=headers) if delete_response.status_code == 204: print(f'Repository {repo_name} deleted successfully.') else: print(f'Failed to delete repository {repo_name}.') # 删除仓库失败 time.sleep(0.5) # 添加延迟以避免速率限制 else: print(f'Failed to fetch repositories for {username}. Status code:', response.status_code) # 请求仓库失败 # 获取所有用户并删除除指定用户外的用户及其仓库 response = session.get(users_api, headers=headers) # 获取所有用户 if response.status_code == 200: users = response.json() # 将响应的 JSON 数据转换为 Python 对象 for user in users: username = user['username'] # 获取用户名 if username not in args.keep_users: # 如果用户不在保留列表中 print(f'Processing user: {username}') # 先删除该用户的仓库 delete_user_repositories(username) # 删除仓库 # 然后删除用户 delete_user_api = f'{gitea_url}/api/v1/admin/users/{username}' # 删除用户的 API URL delete_response = session.delete(delete_user_api, headers=headers) if delete_response.status_code == 204: print(f'User {username} deleted successfully.') else: print(f'Failed to delete user {username}. Status code: {delete_response.status_code}') # 删除用户失败 time.sleep(0.5) # 添加延迟以避免速率限制 else: print(f'User {username} is being kept.') # 用户被保留 else: print('Failed to fetch users. Status code:', response.status_code) # 获取用户失败