初始提交:人事共享服务中心钉钉登录功能

This commit is contained in:
zsc
2026-05-16 11:15:24 +08:00
commit 7ba21d6413
23 changed files with 1770 additions and 0 deletions

292
dingtalk.py Normal file
View File

@@ -0,0 +1,292 @@
import time
import hashlib
import requests
from flask import Blueprint, render_template, jsonify, current_app, request
dingtalk_bp = Blueprint('dingtalk', __name__)
def get_dingtalk_config():
print('[后端] [1] ============== get_dingtalk_config 开始 ==============')
app_key = current_app.config.get('DINGTALK_APP_KEY', '')
agent_id = current_app.config.get('DINGTALK_AGENT_ID', '')
corp_id = current_app.config.get('DINGTALK_CORP_ID', '')
app_secret = current_app.config.get('DINGTALK_APP_SECRET', '')
target_url = current_app.config.get('DINGTALK_TARGET_URL', 'http://localhost:5001/requirement-collection')
print(f'[后端] [1] 配置读取: app_key={app_key}, agent_id={agent_id}, corp_id={corp_id}')
if not app_key or not app_secret:
print(f'[后端] [1] 缺少 app_key 或 app_secret, 返回 None')
return None
result = {
'app_key': app_key,
'app_secret': app_secret,
'agent_id': agent_id,
'corp_id': corp_id,
'target_url': target_url
}
print(f'[后端] [1] get_dingtalk_config 完成, 返回:', result)
return result
def get_access_token():
print('[后端] [2] ============== get_access_token 开始 ==============')
app_key = current_app.config.get('DINGTALK_APP_KEY')
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
if not app_key or not app_secret:
print('[后端] [2] 缺少 app_key 或 app_secret, 返回 None')
return None
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
headers = {'Content-Type': 'application/json'}
data = {'appKey': app_key, 'appSecret': app_secret}
print(f'[后端] [2] 请求钉钉API: {url}, data: appKey={app_key}')
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [2] 钉钉响应:', result)
if result.get('accessToken'):
print(f'[后端] [2] 获取access_token成功')
return result['accessToken']
except Exception as e:
print(f'[后端] [2] 异常:', e)
return None
def get_jsapi_ticket():
print('[后端] [2b] ============== get_jsapi_ticket 开始 ==============')
app_key = current_app.config.get('DINGTALK_APP_KEY')
app_secret = current_app.config.get('DINGTALK_APP_SECRET')
if not app_key or not app_secret:
print('[后端] [2b] 缺少 app_key 或 app_secret, 返回 None')
return None
url = f'https://oapi.dingtalk.com/gettoken?appkey={app_key}&appsecret={app_secret}'
print(f'[后端] [2b] 请求钉钉API: {url}')
try:
response = requests.get(url, timeout=10)
result = response.json()
print(f'[后端] [2b] 获取token响应:', result)
if result.get('access_token'):
access_token = result['access_token']
ticket_url = f'https://oapi.dingtalk.com/get_jsapi_ticket?access_token={access_token}'
print(f'[后端] [2b] 请求ticket API: {ticket_url}')
ticket_response = requests.get(ticket_url, timeout=10)
ticket_result = ticket_response.json()
print(f'[后端] [2b] 获取ticket响应:', ticket_result)
if ticket_result.get('ticket'):
print(f'[后端] [2b] 获取jsapi_ticket成功')
return ticket_result['ticket']
except Exception as e:
print(f'[后端] [2b] 异常:', e)
return None
def generate_signature(jsapi_ticket, nonce_str, timestamp, url):
print(f'[后端] [2c] ============== generate_signature 开始 ==============')
print(f'[后端] [2c] 参数: jsapi_ticket={jsapi_ticket[:20] if jsapi_ticket else "None"}..., nonce_str={nonce_str}, timestamp={timestamp}, url={url}')
sign_str = f'jsapi_ticket={jsapi_ticket}&noncestr={nonce_str}&timestamp={timestamp}&url={url}'
print(f'[后端] [2c] 签名字符串: {sign_str}')
signature = hashlib.sha1(sign_str.encode('utf-8')).hexdigest()
print(f'[后端] [2c] 签名结果: {signature}')
return signature
def get_user_info_by_code(code):
print(f'[后端] [3] ============== get_user_info_by_code 开始, code={code} ==============')
access_token = get_access_token()
if not access_token:
print('[后端] [3] 获取 access_token 失败, 返回 None')
return None
# 使用钉钉微应用API通过临时授权码获取用户信息
url = f'https://oapi.dingtalk.com/topapi/v2/user/getuserinfo?access_token={access_token}'
print(f'[后端] [3] 请求钉钉API: {url}')
headers = {'Content-Type': 'application/json'}
data = {'code': code}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [3] 钉钉用户信息响应:', result)
return result
except Exception as e:
print(f'[后端] [3] 异常:', e)
return None
def get_user_detail(user_id):
print(f'[后端] [4] ============== get_user_detail 开始, user_id={user_id} ==============')
access_token = get_access_token()
if not access_token:
print('[后端] [4] 获取 access_token 失败, 返回 None')
return None
# 使用新版API获取用户详情
url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}'
print(f'[后端] [4] 请求钉钉API: {url}')
headers = {'Content-Type': 'application/json'}
data = {'userid': user_id}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [4] 钉钉用户详情响应:', result)
return result
except Exception as e:
print(f'[后端] [4] 异常:', e)
return None
def get_dept_name(dept_id):
print(f'[后端] [5] ============== get_dept_name 开始, dept_id={dept_id} ==============')
access_token = get_access_token()
if not access_token:
print('[后端] [5] 获取 access_token 失败, 返回 None')
return None
# 使用新版API获取部门信息
url = f'https://oapi.dingtalk.com/topapi/v2/department/get?access_token={access_token}'
print(f'[后端] [5] 请求钉钉API: {url}')
headers = {'Content-Type': 'application/json'}
data = {'dept_id': int(dept_id)}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
result = response.json()
print(f'[后端] [5] 钉钉部门信息响应:', result)
if result.get('errcode') == 0 and result.get('result') and result['result'].get('name'):
print(f'[后端] [5] 获取部门名称成功: {result["result"]["name"]}')
return result['result']['name']
except Exception as e:
print(f'[后端] [5] 异常:', e)
return None
@dingtalk_bp.route('/')
def dingtalk_entry():
print('[后端] [0] ============== dingtalk_entry 请求到达 ==============')
config = get_dingtalk_config()
if not config:
print('[后端] [0] 配置为空,使用默认值')
return render_template('dingtalk_entry.html',
app_key='',
agent_id='',
corp_id='',
target_url='http://localhost:5001/requirement-collection'
), 400
print('[后端] [0] 渲染 dingtalk_entry.html')
return render_template('dingtalk_entry.html',
app_key=config['app_key'],
agent_id=config['agent_id'],
corp_id=config['corp_id'],
target_url=config['target_url']
)
@dingtalk_bp.route('/api/getSignature')
def get_signature():
print('[后端] [S] ============== get_signature 请求到达 ==============')
url = request.args.get('url', '')
print(f'[后端] [S] 收到前端传来的URL: {url}')
if not url:
print('[后端] [S] 缺少url参数')
return jsonify({'error': '缺少url参数'}), 400
config = get_dingtalk_config()
if not config:
print('[后端] [S] 配置为空')
return jsonify({'error': '配置为空'}), 400
jsapi_ticket = get_jsapi_ticket()
if not jsapi_ticket:
print('[后端] [S] 获取jsapi_ticket失败')
return jsonify({'error': '获取jsapi_ticket失败'}), 500
nonce_str = 'dingtalk' + str(int(time.time()))
timestamp = str(int(time.time() * 1000))
signature = generate_signature(jsapi_ticket, nonce_str, timestamp, url)
print(f'[后端] [S] ============== get_signature 完成 ==============')
return jsonify({
'signature': signature,
'nonceStr': nonce_str,
'timeStamp': timestamp,
'agentId': config['agent_id'] and int(config['agent_id']) or 0
})
@dingtalk_bp.route('/api/getDingUser')
def get_ding_user():
print('[后端] [6] ============== get_ding_user 请求到达 ==============')
code = request.args.get('code')
print(f'[后端] [6] 收到 code 参数: {code}')
if not code:
print('[后端] [6] 缺少 code 参数')
return jsonify({'errcode': 400, 'errmsg': '缺少code参数'})
print('[后端] [6] 开始调用 get_user_info_by_code')
user_info = get_user_info_by_code(code)
if not user_info:
print('[后端] [6] 获取用户信息失败')
return jsonify({'errcode': 500, 'errmsg': '获取用户信息失败'})
# 处理新的API响应格式
result = {
'errcode': 0,
'errmsg': 'ok',
'userid': '',
'name': '',
'department': []
}
if user_info.get('errcode') == 0 and user_info.get('result'):
result['userid'] = user_info['result'].get('userid', '')
else:
# 兼容旧格式
result['userid'] = user_info.get('userId', user_info.get('userid', ''))
print(f'[后端] [6] 初步结果: {result}')
if result['userid']:
print('[后端] [6] 有 userid, 调用 get_user_detail 获取详情')
user_detail = get_user_detail(result['userid'])
if user_detail and user_detail.get('errcode') == 0 and user_detail.get('result'):
result['name'] = user_detail['result'].get('name', '')
dept_ids = user_detail['result'].get('dept_id_list', [])
print(f'[后端] [6] 更新用户名为: {result["name"]}')
print(f'[后端] [6] 部门ID列表: {dept_ids}')
# 获取部门名称
if dept_ids:
print('[后端] [6] 开始获取部门名称')
dept_names = []
for dept_id in dept_ids:
name = get_dept_name(str(dept_id))
if name:
dept_names.append(name)
if dept_names:
result['department'] = dept_names
print(f'[后端] [6] 更新部门为: {result["department"]}')
elif user_detail:
# 兼容旧格式
result['name'] = user_detail.get('name', '')
dept_ids = user_detail.get('department', [])
if dept_ids:
dept_names = []
for dept_id in dept_ids:
name = get_dept_name(str(dept_id))
if name:
dept_names.append(name)
if dept_names:
result['department'] = dept_names
print(f'[后端] [6] ============== get_ding_user 完成, 返回: {result} ==============')
return jsonify(result)