293 lines
11 KiB
Python
293 lines
11 KiB
Python
import time
|
|
import hashlib
|
|
import requests
|
|
from flask import Blueprint, render_template, jsonify, current_app, request
|
|
|
|
dingtalk_bp = Blueprint('dingtalk', __name__)
|
|
|
|
def get_dingtalk_config():
|
|
print('[后端] [1] ============== get_dingtalk_config 开始 ==============')
|
|
app_key = current_app.config.get('DINGTALK_APP_KEY', '')
|
|
agent_id = current_app.config.get('DINGTALK_AGENT_ID', '')
|
|
corp_id = current_app.config.get('DINGTALK_CORP_ID', '')
|
|
app_secret = current_app.config.get('DINGTALK_APP_SECRET', '')
|
|
target_url = current_app.config.get('DINGTALK_TARGET_URL', 'http://localhost:5001/requirement-collection')
|
|
|
|
print(f'[后端] [1] 配置读取: app_key={app_key}, agent_id={agent_id}, corp_id={corp_id}')
|
|
|
|
if not app_key or not app_secret:
|
|
print(f'[后端] [1] 缺少 app_key 或 app_secret, 返回 None')
|
|
return None
|
|
|
|
result = {
|
|
'app_key': app_key,
|
|
'app_secret': app_secret,
|
|
'agent_id': agent_id,
|
|
'corp_id': corp_id,
|
|
'target_url': target_url
|
|
}
|
|
print(f'[后端] [1] get_dingtalk_config 完成, 返回:', result)
|
|
return result
|
|
|
|
def get_access_token():
|
|
print('[后端] [2] ============== get_access_token 开始 ==============')
|
|
app_key = current_app.config.get('DINGTALK_APP_KEY')
|
|
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
|
|
|
|
if not app_key or not app_secret:
|
|
print('[后端] [2] 缺少 app_key 或 app_secret, 返回 None')
|
|
return None
|
|
|
|
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
|
headers = {'Content-Type': 'application/json'}
|
|
data = {'appKey': app_key, 'appSecret': app_secret}
|
|
print(f'[后端] [2] 请求钉钉API: {url}, data: appKey={app_key}')
|
|
|
|
try:
|
|
response = requests.post(url, json=data, headers=headers, timeout=10)
|
|
result = response.json()
|
|
print(f'[后端] [2] 钉钉响应:', result)
|
|
if result.get('accessToken'):
|
|
print(f'[后端] [2] 获取access_token成功')
|
|
return result['accessToken']
|
|
except Exception as e:
|
|
print(f'[后端] [2] 异常:', e)
|
|
return None
|
|
|
|
def get_jsapi_ticket():
|
|
print('[后端] [2b] ============== get_jsapi_ticket 开始 ==============')
|
|
app_key = current_app.config.get('DINGTALK_APP_KEY')
|
|
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
|
|
|
|
if not app_key or not app_secret:
|
|
print('[后端] [2b] 缺少 app_key 或 app_secret, 返回 None')
|
|
return None
|
|
|
|
url = f'https://oapi.dingtalk.com/gettoken?appkey={app_key}&appsecret={app_secret}'
|
|
print(f'[后端] [2b] 请求钉钉API: {url}')
|
|
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
result = response.json()
|
|
print(f'[后端] [2b] 获取token响应:', result)
|
|
if result.get('access_token'):
|
|
access_token = result['access_token']
|
|
ticket_url = f'https://oapi.dingtalk.com/get_jsapi_ticket?access_token={access_token}'
|
|
print(f'[后端] [2b] 请求ticket API: {ticket_url}')
|
|
ticket_response = requests.get(ticket_url, timeout=10)
|
|
ticket_result = ticket_response.json()
|
|
print(f'[后端] [2b] 获取ticket响应:', ticket_result)
|
|
if ticket_result.get('ticket'):
|
|
print(f'[后端] [2b] 获取jsapi_ticket成功')
|
|
return ticket_result['ticket']
|
|
except Exception as e:
|
|
print(f'[后端] [2b] 异常:', e)
|
|
return None
|
|
|
|
def generate_signature(jsapi_ticket, nonce_str, timestamp, url):
|
|
print(f'[后端] [2c] ============== generate_signature 开始 ==============')
|
|
print(f'[后端] [2c] 参数: jsapi_ticket={jsapi_ticket[:20] if jsapi_ticket else "None"}..., nonce_str={nonce_str}, timestamp={timestamp}, url={url}')
|
|
|
|
sign_str = f'jsapi_ticket={jsapi_ticket}&noncestr={nonce_str}×tamp={timestamp}&url={url}'
|
|
print(f'[后端] [2c] 签名字符串: {sign_str}')
|
|
|
|
signature = hashlib.sha1(sign_str.encode('utf-8')).hexdigest()
|
|
print(f'[后端] [2c] 签名结果: {signature}')
|
|
return signature
|
|
|
|
def get_user_info_by_code(code):
|
|
print(f'[后端] [3] ============== get_user_info_by_code 开始, code={code} ==============')
|
|
access_token = get_access_token()
|
|
if not access_token:
|
|
print('[后端] [3] 获取 access_token 失败, 返回 None')
|
|
return None
|
|
|
|
# 使用钉钉微应用API通过临时授权码获取用户信息
|
|
url = f'https://oapi.dingtalk.com/topapi/v2/user/getuserinfo?access_token={access_token}'
|
|
print(f'[后端] [3] 请求钉钉API: {url}')
|
|
|
|
headers = {'Content-Type': 'application/json'}
|
|
data = {'code': code}
|
|
|
|
try:
|
|
response = requests.post(url, json=data, headers=headers, timeout=10)
|
|
result = response.json()
|
|
print(f'[后端] [3] 钉钉用户信息响应:', result)
|
|
return result
|
|
except Exception as e:
|
|
print(f'[后端] [3] 异常:', e)
|
|
return None
|
|
|
|
def get_user_detail(user_id):
|
|
print(f'[后端] [4] ============== get_user_detail 开始, user_id={user_id} ==============')
|
|
access_token = get_access_token()
|
|
if not access_token:
|
|
print('[后端] [4] 获取 access_token 失败, 返回 None')
|
|
return None
|
|
|
|
# 使用新版API获取用户详情
|
|
url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}'
|
|
print(f'[后端] [4] 请求钉钉API: {url}')
|
|
|
|
headers = {'Content-Type': 'application/json'}
|
|
data = {'userid': user_id}
|
|
|
|
try:
|
|
response = requests.post(url, json=data, headers=headers, timeout=10)
|
|
result = response.json()
|
|
print(f'[后端] [4] 钉钉用户详情响应:', result)
|
|
return result
|
|
except Exception as e:
|
|
print(f'[后端] [4] 异常:', e)
|
|
return None
|
|
|
|
def get_dept_name(dept_id):
|
|
print(f'[后端] [5] ============== get_dept_name 开始, dept_id={dept_id} ==============')
|
|
access_token = get_access_token()
|
|
if not access_token:
|
|
print('[后端] [5] 获取 access_token 失败, 返回 None')
|
|
return None
|
|
|
|
# 使用新版API获取部门信息
|
|
url = f'https://oapi.dingtalk.com/topapi/v2/department/get?access_token={access_token}'
|
|
print(f'[后端] [5] 请求钉钉API: {url}')
|
|
|
|
headers = {'Content-Type': 'application/json'}
|
|
data = {'dept_id': int(dept_id)}
|
|
|
|
try:
|
|
response = requests.post(url, json=data, headers=headers, timeout=10)
|
|
result = response.json()
|
|
print(f'[后端] [5] 钉钉部门信息响应:', result)
|
|
if result.get('errcode') == 0 and result.get('result') and result['result'].get('name'):
|
|
print(f'[后端] [5] 获取部门名称成功: {result["result"]["name"]}')
|
|
return result['result']['name']
|
|
except Exception as e:
|
|
print(f'[后端] [5] 异常:', e)
|
|
return None
|
|
|
|
@dingtalk_bp.route('/')
|
|
def dingtalk_entry():
|
|
print('[后端] [0] ============== dingtalk_entry 请求到达 ==============')
|
|
|
|
config = get_dingtalk_config()
|
|
|
|
if not config:
|
|
print('[后端] [0] 配置为空,使用默认值')
|
|
return render_template('dingtalk_entry.html',
|
|
app_key='',
|
|
agent_id='',
|
|
corp_id='',
|
|
target_url='http://localhost:5001/requirement-collection'
|
|
), 400
|
|
|
|
print('[后端] [0] 渲染 dingtalk_entry.html')
|
|
return render_template('dingtalk_entry.html',
|
|
app_key=config['app_key'],
|
|
agent_id=config['agent_id'],
|
|
corp_id=config['corp_id'],
|
|
target_url=config['target_url']
|
|
)
|
|
|
|
@dingtalk_bp.route('/api/getSignature')
|
|
def get_signature():
|
|
print('[后端] [S] ============== get_signature 请求到达 ==============')
|
|
url = request.args.get('url', '')
|
|
print(f'[后端] [S] 收到前端传来的URL: {url}')
|
|
|
|
if not url:
|
|
print('[后端] [S] 缺少url参数')
|
|
return jsonify({'error': '缺少url参数'}), 400
|
|
|
|
config = get_dingtalk_config()
|
|
if not config:
|
|
print('[后端] [S] 配置为空')
|
|
return jsonify({'error': '配置为空'}), 400
|
|
|
|
jsapi_ticket = get_jsapi_ticket()
|
|
if not jsapi_ticket:
|
|
print('[后端] [S] 获取jsapi_ticket失败')
|
|
return jsonify({'error': '获取jsapi_ticket失败'}), 500
|
|
|
|
nonce_str = 'dingtalk' + str(int(time.time()))
|
|
timestamp = str(int(time.time() * 1000))
|
|
|
|
signature = generate_signature(jsapi_ticket, nonce_str, timestamp, url)
|
|
print(f'[后端] [S] ============== get_signature 完成 ==============')
|
|
|
|
return jsonify({
|
|
'signature': signature,
|
|
'nonceStr': nonce_str,
|
|
'timeStamp': timestamp,
|
|
'agentId': config['agent_id'] and int(config['agent_id']) or 0
|
|
})
|
|
|
|
@dingtalk_bp.route('/api/getDingUser')
|
|
def get_ding_user():
|
|
print('[后端] [6] ============== get_ding_user 请求到达 ==============')
|
|
code = request.args.get('code')
|
|
print(f'[后端] [6] 收到 code 参数: {code}')
|
|
|
|
if not code:
|
|
print('[后端] [6] 缺少 code 参数')
|
|
return jsonify({'errcode': 400, 'errmsg': '缺少code参数'})
|
|
|
|
print('[后端] [6] 开始调用 get_user_info_by_code')
|
|
user_info = get_user_info_by_code(code)
|
|
if not user_info:
|
|
print('[后端] [6] 获取用户信息失败')
|
|
return jsonify({'errcode': 500, 'errmsg': '获取用户信息失败'})
|
|
|
|
# 处理新的API响应格式
|
|
result = {
|
|
'errcode': 0,
|
|
'errmsg': 'ok',
|
|
'userid': '',
|
|
'name': '',
|
|
'department': []
|
|
}
|
|
|
|
if user_info.get('errcode') == 0 and user_info.get('result'):
|
|
result['userid'] = user_info['result'].get('userid', '')
|
|
else:
|
|
# 兼容旧格式
|
|
result['userid'] = user_info.get('userId', user_info.get('userid', ''))
|
|
|
|
print(f'[后端] [6] 初步结果: {result}')
|
|
|
|
if result['userid']:
|
|
print('[后端] [6] 有 userid, 调用 get_user_detail 获取详情')
|
|
user_detail = get_user_detail(result['userid'])
|
|
if user_detail and user_detail.get('errcode') == 0 and user_detail.get('result'):
|
|
result['name'] = user_detail['result'].get('name', '')
|
|
dept_ids = user_detail['result'].get('dept_id_list', [])
|
|
print(f'[后端] [6] 更新用户名为: {result["name"]}')
|
|
print(f'[后端] [6] 部门ID列表: {dept_ids}')
|
|
|
|
# 获取部门名称
|
|
if dept_ids:
|
|
print('[后端] [6] 开始获取部门名称')
|
|
dept_names = []
|
|
for dept_id in dept_ids:
|
|
name = get_dept_name(str(dept_id))
|
|
if name:
|
|
dept_names.append(name)
|
|
if dept_names:
|
|
result['department'] = dept_names
|
|
print(f'[后端] [6] 更新部门为: {result["department"]}')
|
|
elif user_detail:
|
|
# 兼容旧格式
|
|
result['name'] = user_detail.get('name', '')
|
|
dept_ids = user_detail.get('department', [])
|
|
if dept_ids:
|
|
dept_names = []
|
|
for dept_id in dept_ids:
|
|
name = get_dept_name(str(dept_id))
|
|
if name:
|
|
dept_names.append(name)
|
|
if dept_names:
|
|
result['department'] = dept_names
|
|
|
|
print(f'[后端] [6] ============== get_ding_user 完成, 返回: {result} ==============')
|
|
return jsonify(result)
|