Files
mashangban/dingtalk.py
2026-05-16 15:54:25 +08:00

405 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import hashlib
import requests
from flask import Blueprint, render_template, jsonify, current_app, request
dingtalk_bp = Blueprint('dingtalk', __name__)
# 延迟导入避免循环引用
def get_user_model():
from models import User
return User
def get_dingtalk_config():
print('[后端] [1] ============== get_dingtalk_config 开始 ==============')
app_key = current_app.config.get('DINGTALK_APP_KEY', '')
agent_id = current_app.config.get('DINGTALK_AGENT_ID', '')
corp_id = current_app.config.get('DINGTALK_CORP_ID', '')
app_secret = current_app.config.get('DINGTALK_APP_SECRET', '')
target_url = current_app.config.get('DINGTALK_TARGET_URL', 'http://localhost:5001/requirement-collection')
print(f'[后端] [1] 配置读取: app_key={app_key}, agent_id={agent_id}, corp_id={corp_id}')
if not app_key or not app_secret:
print(f'[后端] [1] 缺少 app_key 或 app_secret, 返回 None')
return None
result = {
'app_key': app_key,
'app_secret': app_secret,
'agent_id': agent_id,
'corp_id': corp_id,
'target_url': target_url
}
print(f'[后端] [1] get_dingtalk_config 完成, 返回:', result)
return result
def get_access_token():
print('[后端] [2] ============== get_access_token 开始 ==============')
app_key = current_app.config.get('DINGTALK_APP_KEY')
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
if not app_key or not app_secret:
print('[后端] [2] 缺少 app_key 或 app_secret, 返回 None')
return None
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
headers = {'Content-Type': 'application/json'}
data = {'appKey': app_key, 'appSecret': app_secret}
print(f'[后端] [2] 请求钉钉API: {url}, data: appKey={app_key}')
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [2] 钉钉响应:', result)
if result.get('accessToken'):
print(f'[后端] [2] 获取access_token成功')
return result['accessToken']
except Exception as e:
print(f'[后端] [2] 异常:', e)
return None
def get_jsapi_ticket():
print('[后端] [2b] ============== get_jsapi_ticket 开始 ==============')
app_key = current_app.config.get('DINGTALK_APP_KEY')
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
if not app_key or not app_secret:
print('[后端] [2b] 缺少 app_key 或 app_secret, 返回 None')
return None
url = f'https://oapi.dingtalk.com/gettoken?appkey={app_key}&appsecret={app_secret}'
print(f'[后端] [2b] 请求钉钉API: {url}')
try:
response = requests.get(url, timeout=10)
result = response.json()
print(f'[后端] [2b] 获取token响应:', result)
if result.get('access_token'):
access_token = result['access_token']
ticket_url = f'https://oapi.dingtalk.com/get_jsapi_ticket?access_token={access_token}'
print(f'[后端] [2b] 请求ticket API: {ticket_url}')
ticket_response = requests.get(ticket_url, timeout=10)
ticket_result = ticket_response.json()
print(f'[后端] [2b] 获取ticket响应:', ticket_result)
if ticket_result.get('ticket'):
print(f'[后端] [2b] 获取jsapi_ticket成功')
return ticket_result['ticket']
except Exception as e:
print(f'[后端] [2b] 异常:', e)
return None
def generate_signature(jsapi_ticket, nonce_str, timestamp, url):
print(f'[后端] [2c] ============== generate_signature 开始 ==============')
print(f'[后端] [2c] 参数: jsapi_ticket={jsapi_ticket[:20] if jsapi_ticket else "None"}..., nonce_str={nonce_str}, timestamp={timestamp}, url={url}')
sign_str = f'jsapi_ticket={jsapi_ticket}&noncestr={nonce_str}&timestamp={timestamp}&url={url}'
print(f'[后端] [2c] 签名字符串: {sign_str}')
signature = hashlib.sha1(sign_str.encode('utf-8')).hexdigest()
print(f'[后端] [2c] 签名结果: {signature}')
return signature
def get_user_info_by_code(code):
print(f'[后端] [3] ============== get_user_info_by_code 开始, code={code} ==============')
access_token = get_access_token()
if not access_token:
print('[后端] [3] 获取 access_token 失败, 返回 None')
return None
# 使用钉钉微应用API通过临时授权码获取用户信息
url = f'https://oapi.dingtalk.com/topapi/v2/user/getuserinfo?access_token={access_token}'
print(f'[后端] [3] 请求钉钉API: {url}')
headers = {'Content-Type': 'application/json'}
data = {'code': code}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [3] 钉钉用户信息响应:', result)
return result
except Exception as e:
print(f'[后端] [3] 异常:', e)
return None
def get_user_detail(user_id):
print(f'[后端] [4] ============== get_user_detail 开始, user_id={user_id} ==============')
access_token = get_access_token()
if not access_token:
print('[后端] [4] 获取 access_token 失败, 返回 None')
return None
# 使用新版API获取用户详情
url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}'
print(f'[后端] [4] 请求钉钉API: {url}')
headers = {'Content-Type': 'application/json'}
data = {'userid': user_id}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [4] 钉钉用户详情响应:', result)
return result
except Exception as e:
print(f'[后端] [4] 异常:', e)
return None
def get_dept_name(dept_id):
print(f'[后端] [5] ============== get_dept_name 开始, dept_id={dept_id} ==============')
access_token = get_access_token()
if not access_token:
print('[后端] [5] 获取 access_token 失败, 返回 None')
return None
# 使用新版API获取部门信息
url = f'https://oapi.dingtalk.com/topapi/v2/department/get?access_token={access_token}'
print(f'[后端] [5] 请求钉钉API: {url}')
headers = {'Content-Type': 'application/json'}
data = {'dept_id': int(dept_id)}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [5] 钉钉部门信息响应:', result)
if result.get('errcode') == 0 and result.get('result') and result['result'].get('name'):
print(f'[后端] [5] 获取部门名称成功: {result["result"]["name"]}')
return result['result']['name']
except Exception as e:
print(f'[后端] [5] 异常:', e)
return None
def send_dingtalk_card_to_user(user_id, title, content, jump_url):
"""发送钉钉消息卡片给指定用户"""
print(f'[后端] [N] ============== send_dingtalk_card_to_user 开始, user_id={user_id} ==============')
access_token = get_access_token()
if not access_token:
print('[后端] [N] 获取 access_token 失败')
return False
url = f'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token={access_token}'
headers = {'Content-Type': 'application/json'}
agent_id = current_app.config.get('DINGTALK_AGENT_ID', '')
data = {
'agent_id': int(agent_id) if agent_id else 0,
'userid_list': user_id,
'msg': {
'msgtype': 'link',
'link': {
'title': title,
'text': content,
'picUrl': 'https://img.alicdn.com/tfs/TB1NwmBkeL2gK0jSZPhXXahviXa-72-72.png',
'messageUrl': jump_url
}
}
}
print(f'[后端] [N] 发送消息卡片: {data}')
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [N] 钉钉响应:', result)
if result.get('errcode') == 0:
print(f'[后端] [N] 消息发送成功')
return True
else:
print(f'[后端] [N] 消息发送失败: {result.get("errmsg", "未知错误")}')
return False
except Exception as e:
print(f'[后端] [N] 异常:', e)
return False
def notify_admins_new_demand(demand, base_url):
"""通知所有管理员有新需求提交"""
print(f'[后端] [N] ============== notify_admins_new_demand 开始 ==============')
# 获取所有管理员用户
from models import User
admins = User.query.filter_by(role='admin').all()
print(f'[后端] [N] 找到 {len(admins)} 个管理员')
if not admins:
print('[后端] [N] 没有管理员用户,跳过通知')
return
# 构建消息内容
title = f'📋 新需求提交:{demand.title}'
content = f'''需求标题:{demand.title}
提交者:{demand.contact or "未知"}
分会:{demand.branch}
点击查看并回答'''.strip()
# 回答页面的跳转链接
jump_url = f'{base_url}/demand/{demand.id}/answer'
success_count = 0
for admin in admins:
if admin.dingtalk_userid:
if send_dingtalk_card_to_user(admin.dingtalk_userid, title, content, jump_url):
success_count += 1
print(f'[后端] [N] 消息发送完成,成功: {success_count}/{len(admins)}')
def notify_asker_answer(demand, base_url):
"""通知提问者问题已被回答"""
print(f'[后端] [N] ============== notify_asker_answer 开始 ==============')
# 获取提问者
User = get_user_model()
asker = User.query.get(demand.user_id)
if not asker:
print('[后端] [N] 未找到提问者,跳过通知')
return
if not asker.dingtalk_userid:
print(f'[后端] [N] 提问者 {asker.username} 没有钉钉userid跳过通知')
return
# 构建消息内容
title = f'✅ 您的需求已被回答:{demand.title}'
content = f'''需求标题:{demand.title}
管理员已回复您的问题,点击查看详情'''.strip()
# 跳转链接 - 指向首页
jump_url = f'{base_url}' + '/dingtalk'
print(f'[后端] [N] 准备通知提问者: {asker.username}, dingtalk_userid: {asker.dingtalk_userid}')
if send_dingtalk_card_to_user(asker.dingtalk_userid, title, content, jump_url):
print(f'[后端] [N] 提问者通知发送成功')
else:
print(f'[后端] [N] 提问者通知发送失败')
@dingtalk_bp.route('/')
def dingtalk_entry():
print('[后端] [0] ============== dingtalk_entry 请求到达 ==============')
config = get_dingtalk_config()
if not config:
print('[后端] [0] 配置为空,使用默认值')
return render_template('dingtalk_entry.html',
app_key='',
agent_id='',
corp_id='',
target_url='http://localhost:5001/requirement-collection'
), 400
print('[后端] [0] 渲染 dingtalk_entry.html')
return render_template('dingtalk_entry.html',
app_key=config['app_key'],
agent_id=config['agent_id'],
corp_id=config['corp_id'],
target_url=config['target_url']
)
@dingtalk_bp.route('/api/getSignature')
def get_signature():
print('[后端] [S] ============== get_signature 请求到达 ==============')
url = request.args.get('url', '')
print(f'[后端] [S] 收到前端传来的URL: {url}')
if not url:
print('[后端] [S] 缺少url参数')
return jsonify({'error': '缺少url参数'}), 400
config = get_dingtalk_config()
if not config:
print('[后端] [S] 配置为空')
return jsonify({'error': '配置为空'}), 400
jsapi_ticket = get_jsapi_ticket()
if not jsapi_ticket:
print('[后端] [S] 获取jsapi_ticket失败')
return jsonify({'error': '获取jsapi_ticket失败'}), 500
nonce_str = 'dingtalk' + str(int(time.time()))
timestamp = str(int(time.time() * 1000))
signature = generate_signature(jsapi_ticket, nonce_str, timestamp, url)
print(f'[后端] [S] ============== get_signature 完成 ==============')
return jsonify({
'signature': signature,
'nonceStr': nonce_str,
'timeStamp': timestamp,
'agentId': config['agent_id'] and int(config['agent_id']) or 0
})
@dingtalk_bp.route('/api/getDingUser')
def get_ding_user():
print('[后端] [6] ============== get_ding_user 请求到达 ==============')
code = request.args.get('code')
print(f'[后端] [6] 收到 code 参数: {code}')
if not code:
print('[后端] [6] 缺少 code 参数')
return jsonify({'errcode': 400, 'errmsg': '缺少code参数'})
print('[后端] [6] 开始调用 get_user_info_by_code')
user_info = get_user_info_by_code(code)
if not user_info:
print('[后端] [6] 获取用户信息失败')
return jsonify({'errcode': 500, 'errmsg': '获取用户信息失败'})
# 处理新的API响应格式
result = {
'errcode': 0,
'errmsg': 'ok',
'userid': '',
'name': '',
'department': []
}
if user_info.get('errcode') == 0 and user_info.get('result'):
result['userid'] = user_info['result'].get('userid', '')
else:
# 兼容旧格式
result['userid'] = user_info.get('userId', user_info.get('userid', ''))
print(f'[后端] [6] 初步结果: {result}')
if result['userid']:
print('[后端] [6] 有 userid, 调用 get_user_detail 获取详情')
user_detail = get_user_detail(result['userid'])
if user_detail and user_detail.get('errcode') == 0 and user_detail.get('result'):
result['name'] = user_detail['result'].get('name', '')
dept_ids = user_detail['result'].get('dept_id_list', [])
print(f'[后端] [6] 更新用户名为: {result["name"]}')
print(f'[后端] [6] 部门ID列表: {dept_ids}')
# 获取部门名称
if dept_ids:
print('[后端] [6] 开始获取部门名称')
dept_names = []
for dept_id in dept_ids:
name = get_dept_name(str(dept_id))
if name:
dept_names.append(name)
if dept_names:
result['department'] = dept_names
print(f'[后端] [6] 更新部门为: {result["department"]}')
elif user_detail:
# 兼容旧格式
result['name'] = user_detail.get('name', '')
dept_ids = user_detail.get('department', [])
if dept_ids:
dept_names = []
for dept_id in dept_ids:
name = get_dept_name(str(dept_id))
if name:
dept_names.append(name)
if dept_names:
result['department'] = dept_names
print(f'[后端] [6] ============== get_ding_user 完成, 返回: {result} ==============')
return jsonify(result)