405 lines
15 KiB
Python
405 lines
15 KiB
Python
import time
|
||
import hashlib
|
||
import requests
|
||
from flask import Blueprint, render_template, jsonify, current_app, request
|
||
|
||
dingtalk_bp = Blueprint('dingtalk', __name__)
|
||
|
||
# 延迟导入避免循环引用
|
||
def get_user_model():
|
||
from models import User
|
||
return User
|
||
|
||
def get_dingtalk_config():
|
||
print('[后端] [1] ============== get_dingtalk_config 开始 ==============')
|
||
app_key = current_app.config.get('DINGTALK_APP_KEY', '')
|
||
agent_id = current_app.config.get('DINGTALK_AGENT_ID', '')
|
||
corp_id = current_app.config.get('DINGTALK_CORP_ID', '')
|
||
app_secret = current_app.config.get('DINGTALK_APP_SECRET', '')
|
||
target_url = current_app.config.get('DINGTALK_TARGET_URL', 'http://localhost:5001/requirement-collection')
|
||
|
||
print(f'[后端] [1] 配置读取: app_key={app_key}, agent_id={agent_id}, corp_id={corp_id}')
|
||
|
||
if not app_key or not app_secret:
|
||
print(f'[后端] [1] 缺少 app_key 或 app_secret, 返回 None')
|
||
return None
|
||
|
||
result = {
|
||
'app_key': app_key,
|
||
'app_secret': app_secret,
|
||
'agent_id': agent_id,
|
||
'corp_id': corp_id,
|
||
'target_url': target_url
|
||
}
|
||
print(f'[后端] [1] get_dingtalk_config 完成, 返回:', result)
|
||
return result
|
||
|
||
def get_access_token():
|
||
print('[后端] [2] ============== get_access_token 开始 ==============')
|
||
app_key = current_app.config.get('DINGTALK_APP_KEY')
|
||
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
|
||
|
||
if not app_key or not app_secret:
|
||
print('[后端] [2] 缺少 app_key 或 app_secret, 返回 None')
|
||
return None
|
||
|
||
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
||
headers = {'Content-Type': 'application/json'}
|
||
data = {'appKey': app_key, 'appSecret': app_secret}
|
||
print(f'[后端] [2] 请求钉钉API: {url}, data: appKey={app_key}')
|
||
|
||
try:
|
||
response = requests.post(url, json=data, headers=headers, timeout=10)
|
||
result = response.json()
|
||
print(f'[后端] [2] 钉钉响应:', result)
|
||
if result.get('accessToken'):
|
||
print(f'[后端] [2] 获取access_token成功')
|
||
return result['accessToken']
|
||
except Exception as e:
|
||
print(f'[后端] [2] 异常:', e)
|
||
return None
|
||
|
||
def get_jsapi_ticket():
|
||
print('[后端] [2b] ============== get_jsapi_ticket 开始 ==============')
|
||
app_key = current_app.config.get('DINGTALK_APP_KEY')
|
||
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
|
||
|
||
if not app_key or not app_secret:
|
||
print('[后端] [2b] 缺少 app_key 或 app_secret, 返回 None')
|
||
return None
|
||
|
||
url = f'https://oapi.dingtalk.com/gettoken?appkey={app_key}&appsecret={app_secret}'
|
||
print(f'[后端] [2b] 请求钉钉API: {url}')
|
||
|
||
try:
|
||
response = requests.get(url, timeout=10)
|
||
result = response.json()
|
||
print(f'[后端] [2b] 获取token响应:', result)
|
||
if result.get('access_token'):
|
||
access_token = result['access_token']
|
||
ticket_url = f'https://oapi.dingtalk.com/get_jsapi_ticket?access_token={access_token}'
|
||
print(f'[后端] [2b] 请求ticket API: {ticket_url}')
|
||
ticket_response = requests.get(ticket_url, timeout=10)
|
||
ticket_result = ticket_response.json()
|
||
print(f'[后端] [2b] 获取ticket响应:', ticket_result)
|
||
if ticket_result.get('ticket'):
|
||
print(f'[后端] [2b] 获取jsapi_ticket成功')
|
||
return ticket_result['ticket']
|
||
except Exception as e:
|
||
print(f'[后端] [2b] 异常:', e)
|
||
return None
|
||
|
||
def generate_signature(jsapi_ticket, nonce_str, timestamp, url):
|
||
print(f'[后端] [2c] ============== generate_signature 开始 ==============')
|
||
print(f'[后端] [2c] 参数: jsapi_ticket={jsapi_ticket[:20] if jsapi_ticket else "None"}..., nonce_str={nonce_str}, timestamp={timestamp}, url={url}')
|
||
|
||
sign_str = f'jsapi_ticket={jsapi_ticket}&noncestr={nonce_str}×tamp={timestamp}&url={url}'
|
||
print(f'[后端] [2c] 签名字符串: {sign_str}')
|
||
|
||
signature = hashlib.sha1(sign_str.encode('utf-8')).hexdigest()
|
||
print(f'[后端] [2c] 签名结果: {signature}')
|
||
return signature
|
||
|
||
def get_user_info_by_code(code):
|
||
print(f'[后端] [3] ============== get_user_info_by_code 开始, code={code} ==============')
|
||
access_token = get_access_token()
|
||
if not access_token:
|
||
print('[后端] [3] 获取 access_token 失败, 返回 None')
|
||
return None
|
||
|
||
# 使用钉钉微应用API通过临时授权码获取用户信息
|
||
url = f'https://oapi.dingtalk.com/topapi/v2/user/getuserinfo?access_token={access_token}'
|
||
print(f'[后端] [3] 请求钉钉API: {url}')
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
data = {'code': code}
|
||
|
||
try:
|
||
response = requests.post(url, json=data, headers=headers, timeout=10)
|
||
result = response.json()
|
||
print(f'[后端] [3] 钉钉用户信息响应:', result)
|
||
return result
|
||
except Exception as e:
|
||
print(f'[后端] [3] 异常:', e)
|
||
return None
|
||
|
||
def get_user_detail(user_id):
|
||
print(f'[后端] [4] ============== get_user_detail 开始, user_id={user_id} ==============')
|
||
access_token = get_access_token()
|
||
if not access_token:
|
||
print('[后端] [4] 获取 access_token 失败, 返回 None')
|
||
return None
|
||
|
||
# 使用新版API获取用户详情
|
||
url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}'
|
||
print(f'[后端] [4] 请求钉钉API: {url}')
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
data = {'userid': user_id}
|
||
|
||
try:
|
||
response = requests.post(url, json=data, headers=headers, timeout=10)
|
||
result = response.json()
|
||
print(f'[后端] [4] 钉钉用户详情响应:', result)
|
||
return result
|
||
except Exception as e:
|
||
print(f'[后端] [4] 异常:', e)
|
||
return None
|
||
|
||
def get_dept_name(dept_id):
|
||
print(f'[后端] [5] ============== get_dept_name 开始, dept_id={dept_id} ==============')
|
||
access_token = get_access_token()
|
||
if not access_token:
|
||
print('[后端] [5] 获取 access_token 失败, 返回 None')
|
||
return None
|
||
|
||
# 使用新版API获取部门信息
|
||
url = f'https://oapi.dingtalk.com/topapi/v2/department/get?access_token={access_token}'
|
||
print(f'[后端] [5] 请求钉钉API: {url}')
|
||
|
||
headers = {'Content-Type': 'application/json'}
|
||
data = {'dept_id': int(dept_id)}
|
||
|
||
try:
|
||
response = requests.post(url, json=data, headers=headers, timeout=10)
|
||
result = response.json()
|
||
print(f'[后端] [5] 钉钉部门信息响应:', result)
|
||
if result.get('errcode') == 0 and result.get('result') and result['result'].get('name'):
|
||
print(f'[后端] [5] 获取部门名称成功: {result["result"]["name"]}')
|
||
return result['result']['name']
|
||
except Exception as e:
|
||
print(f'[后端] [5] 异常:', e)
|
||
return None
|
||
|
||
def send_dingtalk_card_to_user(user_id, title, content, jump_url):
|
||
"""发送钉钉消息卡片给指定用户"""
|
||
print(f'[后端] [N] ============== send_dingtalk_card_to_user 开始, user_id={user_id} ==============')
|
||
access_token = get_access_token()
|
||
if not access_token:
|
||
print('[后端] [N] 获取 access_token 失败')
|
||
return False
|
||
|
||
url = f'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token={access_token}'
|
||
headers = {'Content-Type': 'application/json'}
|
||
|
||
agent_id = current_app.config.get('DINGTALK_AGENT_ID', '')
|
||
data = {
|
||
'agent_id': int(agent_id) if agent_id else 0,
|
||
'userid_list': user_id,
|
||
'msg': {
|
||
'msgtype': 'link',
|
||
'link': {
|
||
'title': title,
|
||
'text': content,
|
||
'picUrl': 'https://img.alicdn.com/tfs/TB1NwmBkeL2gK0jSZPhXXahviXa-72-72.png',
|
||
'messageUrl': jump_url
|
||
}
|
||
}
|
||
}
|
||
|
||
print(f'[后端] [N] 发送消息卡片: {data}')
|
||
|
||
try:
|
||
response = requests.post(url, json=data, headers=headers, timeout=10)
|
||
result = response.json()
|
||
print(f'[后端] [N] 钉钉响应:', result)
|
||
if result.get('errcode') == 0:
|
||
print(f'[后端] [N] 消息发送成功')
|
||
return True
|
||
else:
|
||
print(f'[后端] [N] 消息发送失败: {result.get("errmsg", "未知错误")}')
|
||
return False
|
||
except Exception as e:
|
||
print(f'[后端] [N] 异常:', e)
|
||
return False
|
||
|
||
def notify_admins_new_demand(demand, base_url):
|
||
"""通知所有管理员有新需求提交"""
|
||
print(f'[后端] [N] ============== notify_admins_new_demand 开始 ==============')
|
||
|
||
# 获取所有管理员用户
|
||
from models import User
|
||
admins = User.query.filter_by(role='admin').all()
|
||
print(f'[后端] [N] 找到 {len(admins)} 个管理员')
|
||
|
||
if not admins:
|
||
print('[后端] [N] 没有管理员用户,跳过通知')
|
||
return
|
||
|
||
# 构建消息内容
|
||
title = f'📋 新需求提交:{demand.title}'
|
||
content = f'''需求标题:{demand.title}
|
||
|
||
提交者:{demand.contact or "未知"}
|
||
|
||
分会:{demand.branch}
|
||
|
||
点击查看并回答'''.strip()
|
||
|
||
# 回答页面的跳转链接
|
||
jump_url = f'{base_url}/demand/{demand.id}/answer'
|
||
|
||
success_count = 0
|
||
for admin in admins:
|
||
if admin.dingtalk_userid:
|
||
if send_dingtalk_card_to_user(admin.dingtalk_userid, title, content, jump_url):
|
||
success_count += 1
|
||
|
||
print(f'[后端] [N] 消息发送完成,成功: {success_count}/{len(admins)}')
|
||
|
||
def notify_asker_answer(demand, base_url):
|
||
"""通知提问者问题已被回答"""
|
||
print(f'[后端] [N] ============== notify_asker_answer 开始 ==============')
|
||
|
||
# 获取提问者
|
||
User = get_user_model()
|
||
asker = User.query.get(demand.user_id)
|
||
if not asker:
|
||
print('[后端] [N] 未找到提问者,跳过通知')
|
||
return
|
||
|
||
if not asker.dingtalk_userid:
|
||
print(f'[后端] [N] 提问者 {asker.username} 没有钉钉userid,跳过通知')
|
||
return
|
||
|
||
# 构建消息内容
|
||
title = f'✅ 您的需求已被回答:{demand.title}'
|
||
content = f'''需求标题:{demand.title}
|
||
|
||
管理员已回复您的问题,点击查看详情'''.strip()
|
||
|
||
# 跳转链接 - 指向首页
|
||
jump_url = f'{base_url}' + '/dingtalk'
|
||
|
||
print(f'[后端] [N] 准备通知提问者: {asker.username}, dingtalk_userid: {asker.dingtalk_userid}')
|
||
|
||
if send_dingtalk_card_to_user(asker.dingtalk_userid, title, content, jump_url):
|
||
print(f'[后端] [N] 提问者通知发送成功')
|
||
else:
|
||
print(f'[后端] [N] 提问者通知发送失败')
|
||
|
||
@dingtalk_bp.route('/')
|
||
def dingtalk_entry():
|
||
print('[后端] [0] ============== dingtalk_entry 请求到达 ==============')
|
||
|
||
config = get_dingtalk_config()
|
||
|
||
if not config:
|
||
print('[后端] [0] 配置为空,使用默认值')
|
||
return render_template('dingtalk_entry.html',
|
||
app_key='',
|
||
agent_id='',
|
||
corp_id='',
|
||
target_url='http://localhost:5001/requirement-collection'
|
||
), 400
|
||
|
||
print('[后端] [0] 渲染 dingtalk_entry.html')
|
||
return render_template('dingtalk_entry.html',
|
||
app_key=config['app_key'],
|
||
agent_id=config['agent_id'],
|
||
corp_id=config['corp_id'],
|
||
target_url=config['target_url']
|
||
)
|
||
|
||
@dingtalk_bp.route('/api/getSignature')
|
||
def get_signature():
|
||
print('[后端] [S] ============== get_signature 请求到达 ==============')
|
||
url = request.args.get('url', '')
|
||
print(f'[后端] [S] 收到前端传来的URL: {url}')
|
||
|
||
if not url:
|
||
print('[后端] [S] 缺少url参数')
|
||
return jsonify({'error': '缺少url参数'}), 400
|
||
|
||
config = get_dingtalk_config()
|
||
if not config:
|
||
print('[后端] [S] 配置为空')
|
||
return jsonify({'error': '配置为空'}), 400
|
||
|
||
jsapi_ticket = get_jsapi_ticket()
|
||
if not jsapi_ticket:
|
||
print('[后端] [S] 获取jsapi_ticket失败')
|
||
return jsonify({'error': '获取jsapi_ticket失败'}), 500
|
||
|
||
nonce_str = 'dingtalk' + str(int(time.time()))
|
||
timestamp = str(int(time.time() * 1000))
|
||
|
||
signature = generate_signature(jsapi_ticket, nonce_str, timestamp, url)
|
||
print(f'[后端] [S] ============== get_signature 完成 ==============')
|
||
|
||
return jsonify({
|
||
'signature': signature,
|
||
'nonceStr': nonce_str,
|
||
'timeStamp': timestamp,
|
||
'agentId': config['agent_id'] and int(config['agent_id']) or 0
|
||
})
|
||
|
||
@dingtalk_bp.route('/api/getDingUser')
|
||
def get_ding_user():
|
||
print('[后端] [6] ============== get_ding_user 请求到达 ==============')
|
||
code = request.args.get('code')
|
||
print(f'[后端] [6] 收到 code 参数: {code}')
|
||
|
||
if not code:
|
||
print('[后端] [6] 缺少 code 参数')
|
||
return jsonify({'errcode': 400, 'errmsg': '缺少code参数'})
|
||
|
||
print('[后端] [6] 开始调用 get_user_info_by_code')
|
||
user_info = get_user_info_by_code(code)
|
||
if not user_info:
|
||
print('[后端] [6] 获取用户信息失败')
|
||
return jsonify({'errcode': 500, 'errmsg': '获取用户信息失败'})
|
||
|
||
# 处理新的API响应格式
|
||
result = {
|
||
'errcode': 0,
|
||
'errmsg': 'ok',
|
||
'userid': '',
|
||
'name': '',
|
||
'department': []
|
||
}
|
||
|
||
if user_info.get('errcode') == 0 and user_info.get('result'):
|
||
result['userid'] = user_info['result'].get('userid', '')
|
||
else:
|
||
# 兼容旧格式
|
||
result['userid'] = user_info.get('userId', user_info.get('userid', ''))
|
||
|
||
print(f'[后端] [6] 初步结果: {result}')
|
||
|
||
if result['userid']:
|
||
print('[后端] [6] 有 userid, 调用 get_user_detail 获取详情')
|
||
user_detail = get_user_detail(result['userid'])
|
||
if user_detail and user_detail.get('errcode') == 0 and user_detail.get('result'):
|
||
result['name'] = user_detail['result'].get('name', '')
|
||
dept_ids = user_detail['result'].get('dept_id_list', [])
|
||
print(f'[后端] [6] 更新用户名为: {result["name"]}')
|
||
print(f'[后端] [6] 部门ID列表: {dept_ids}')
|
||
|
||
# 获取部门名称
|
||
if dept_ids:
|
||
print('[后端] [6] 开始获取部门名称')
|
||
dept_names = []
|
||
for dept_id in dept_ids:
|
||
name = get_dept_name(str(dept_id))
|
||
if name:
|
||
dept_names.append(name)
|
||
if dept_names:
|
||
result['department'] = dept_names
|
||
print(f'[后端] [6] 更新部门为: {result["department"]}')
|
||
elif user_detail:
|
||
# 兼容旧格式
|
||
result['name'] = user_detail.get('name', '')
|
||
dept_ids = user_detail.get('department', [])
|
||
if dept_ids:
|
||
dept_names = []
|
||
for dept_id in dept_ids:
|
||
name = get_dept_name(str(dept_id))
|
||
if name:
|
||
dept_names.append(name)
|
||
if dept_names:
|
||
result['department'] = dept_names
|
||
|
||
print(f'[后端] [6] ============== get_ding_user 完成, 返回: {result} ==============')
|
||
return jsonify(result)
|