import time import hashlib import requests from flask import Blueprint, render_template, jsonify, current_app, request dingtalk_bp = Blueprint('dingtalk', __name__) def get_dingtalk_config(): print('[后端] [1] ============== get_dingtalk_config 开始 ==============') app_key = current_app.config.get('DINGTALK_APP_KEY', '') agent_id = current_app.config.get('DINGTALK_AGENT_ID', '') corp_id = current_app.config.get('DINGTALK_CORP_ID', '') app_secret = current_app.config.get('DINGTALK_APP_SECRET', '') target_url = current_app.config.get('DINGTALK_TARGET_URL', 'http://localhost:5001/requirement-collection') print(f'[后端] [1] 配置读取: app_key={app_key}, agent_id={agent_id}, corp_id={corp_id}') if not app_key or not app_secret: print(f'[后端] [1] 缺少 app_key 或 app_secret, 返回 None') return None result = { 'app_key': app_key, 'app_secret': app_secret, 'agent_id': agent_id, 'corp_id': corp_id, 'target_url': target_url } print(f'[后端] [1] get_dingtalk_config 完成, 返回:', result) return result def get_access_token(): print('[后端] [2] ============== get_access_token 开始 ==============') app_key = current_app.config.get('DINGTALK_APP_KEY') app_secret = current_app.config.get('DINGTALK_APP_SECRET') if not app_key or not app_secret: print('[后端] [2] 缺少 app_key 或 app_secret, 返回 None') return None url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' headers = {'Content-Type': 'application/json'} data = {'appKey': app_key, 'appSecret': app_secret} print(f'[后端] [2] 请求钉钉API: {url}, data: appKey={app_key}') try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [2] 钉钉响应:', result) if result.get('accessToken'): print(f'[后端] [2] 获取access_token成功') return result['accessToken'] except Exception as e: print(f'[后端] [2] 异常:', e) return None def get_jsapi_ticket(): print('[后端] [2b] ============== get_jsapi_ticket 开始 ==============') app_key = current_app.config.get('DINGTALK_APP_KEY') app_secret = current_app.config.get('DINGTALK_APP_SECRET') if not app_key or not app_secret: print('[后端] [2b] 缺少 app_key 或 app_secret, 返回 None') return None url = f'https://oapi.dingtalk.com/gettoken?appkey={app_key}&appsecret={app_secret}' print(f'[后端] [2b] 请求钉钉API: {url}') try: response = requests.get(url, timeout=10) result = response.json() print(f'[后端] [2b] 获取token响应:', result) if result.get('access_token'): access_token = result['access_token'] ticket_url = f'https://oapi.dingtalk.com/get_jsapi_ticket?access_token={access_token}' print(f'[后端] [2b] 请求ticket API: {ticket_url}') ticket_response = requests.get(ticket_url, timeout=10) ticket_result = ticket_response.json() print(f'[后端] [2b] 获取ticket响应:', ticket_result) if ticket_result.get('ticket'): print(f'[后端] [2b] 获取jsapi_ticket成功') return ticket_result['ticket'] except Exception as e: print(f'[后端] [2b] 异常:', e) return None def generate_signature(jsapi_ticket, nonce_str, timestamp, url): print(f'[后端] [2c] ============== generate_signature 开始 ==============') print(f'[后端] [2c] 参数: jsapi_ticket={jsapi_ticket[:20] if jsapi_ticket else "None"}..., nonce_str={nonce_str}, timestamp={timestamp}, url={url}') sign_str = f'jsapi_ticket={jsapi_ticket}&noncestr={nonce_str}×tamp={timestamp}&url={url}' print(f'[后端] [2c] 签名字符串: {sign_str}') signature = hashlib.sha1(sign_str.encode('utf-8')).hexdigest() print(f'[后端] [2c] 签名结果: {signature}') return signature def get_user_info_by_code(code): print(f'[后端] [3] ============== get_user_info_by_code 开始, code={code} ==============') access_token = get_access_token() if not access_token: print('[后端] [3] 获取 access_token 失败, 返回 None') return None # 使用钉钉微应用API通过临时授权码获取用户信息 url = f'https://oapi.dingtalk.com/topapi/v2/user/getuserinfo?access_token={access_token}' print(f'[后端] [3] 请求钉钉API: {url}') headers = {'Content-Type': 'application/json'} data = {'code': code} try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [3] 钉钉用户信息响应:', result) return result except Exception as e: print(f'[后端] [3] 异常:', e) return None def get_user_detail(user_id): print(f'[后端] [4] ============== get_user_detail 开始, user_id={user_id} ==============') access_token = get_access_token() if not access_token: print('[后端] [4] 获取 access_token 失败, 返回 None') return None # 使用新版API获取用户详情 url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}' print(f'[后端] [4] 请求钉钉API: {url}') headers = {'Content-Type': 'application/json'} data = {'userid': user_id} try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [4] 钉钉用户详情响应:', result) return result except Exception as e: print(f'[后端] [4] 异常:', e) return None def get_dept_name(dept_id): print(f'[后端] [5] ============== get_dept_name 开始, dept_id={dept_id} ==============') access_token = get_access_token() if not access_token: print('[后端] [5] 获取 access_token 失败, 返回 None') return None # 使用新版API获取部门信息 url = f'https://oapi.dingtalk.com/topapi/v2/department/get?access_token={access_token}' print(f'[后端] [5] 请求钉钉API: {url}') headers = {'Content-Type': 'application/json'} data = {'dept_id': int(dept_id)} try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [5] 钉钉部门信息响应:', result) if result.get('errcode') == 0 and result.get('result') and result['result'].get('name'): print(f'[后端] [5] 获取部门名称成功: {result["result"]["name"]}') return result['result']['name'] except Exception as e: print(f'[后端] [5] 异常:', e) return None @dingtalk_bp.route('/') def dingtalk_entry(): print('[后端] [0] ============== dingtalk_entry 请求到达 ==============') config = get_dingtalk_config() if not config: print('[后端] [0] 配置为空,使用默认值') return render_template('dingtalk_entry.html', app_key='', agent_id='', corp_id='', target_url='http://localhost:5001/requirement-collection' ), 400 print('[后端] [0] 渲染 dingtalk_entry.html') return render_template('dingtalk_entry.html', app_key=config['app_key'], agent_id=config['agent_id'], corp_id=config['corp_id'], target_url=config['target_url'] ) @dingtalk_bp.route('/api/getSignature') def get_signature(): print('[后端] [S] ============== get_signature 请求到达 ==============') url = request.args.get('url', '') print(f'[后端] [S] 收到前端传来的URL: {url}') if not url: print('[后端] [S] 缺少url参数') return jsonify({'error': '缺少url参数'}), 400 config = get_dingtalk_config() if not config: print('[后端] [S] 配置为空') return jsonify({'error': '配置为空'}), 400 jsapi_ticket = get_jsapi_ticket() if not jsapi_ticket: print('[后端] [S] 获取jsapi_ticket失败') return jsonify({'error': '获取jsapi_ticket失败'}), 500 nonce_str = 'dingtalk' + str(int(time.time())) timestamp = str(int(time.time() * 1000)) signature = generate_signature(jsapi_ticket, nonce_str, timestamp, url) print(f'[后端] [S] ============== get_signature 完成 ==============') return jsonify({ 'signature': signature, 'nonceStr': nonce_str, 'timeStamp': timestamp, 'agentId': config['agent_id'] and int(config['agent_id']) or 0 }) @dingtalk_bp.route('/api/getDingUser') def get_ding_user(): print('[后端] [6] ============== get_ding_user 请求到达 ==============') code = request.args.get('code') print(f'[后端] [6] 收到 code 参数: {code}') if not code: print('[后端] [6] 缺少 code 参数') return jsonify({'errcode': 400, 'errmsg': '缺少code参数'}) print('[后端] [6] 开始调用 get_user_info_by_code') user_info = get_user_info_by_code(code) if not user_info: print('[后端] [6] 获取用户信息失败') return jsonify({'errcode': 500, 'errmsg': '获取用户信息失败'}) # 处理新的API响应格式 result = { 'errcode': 0, 'errmsg': 'ok', 'userid': '', 'name': '', 'department': [] } if user_info.get('errcode') == 0 and user_info.get('result'): result['userid'] = user_info['result'].get('userid', '') else: # 兼容旧格式 result['userid'] = user_info.get('userId', user_info.get('userid', '')) print(f'[后端] [6] 初步结果: {result}') if result['userid']: print('[后端] [6] 有 userid, 调用 get_user_detail 获取详情') user_detail = get_user_detail(result['userid']) if user_detail and user_detail.get('errcode') == 0 and user_detail.get('result'): result['name'] = user_detail['result'].get('name', '') dept_ids = user_detail['result'].get('dept_id_list', []) print(f'[后端] [6] 更新用户名为: {result["name"]}') print(f'[后端] [6] 部门ID列表: {dept_ids}') # 获取部门名称 if dept_ids: print('[后端] [6] 开始获取部门名称') dept_names = [] for dept_id in dept_ids: name = get_dept_name(str(dept_id)) if name: dept_names.append(name) if dept_names: result['department'] = dept_names print(f'[后端] [6] 更新部门为: {result["department"]}') elif user_detail: # 兼容旧格式 result['name'] = user_detail.get('name', '') dept_ids = user_detail.get('department', []) if dept_ids: dept_names = [] for dept_id in dept_ids: name = get_dept_name(str(dept_id)) if name: dept_names.append(name) if dept_names: result['department'] = dept_names print(f'[后端] [6] ============== get_ding_user 完成, 返回: {result} ==============') return jsonify(result)