import time import hashlib import requests from flask import Blueprint, render_template, jsonify, current_app, request dingtalk_bp = Blueprint('dingtalk', __name__) # 延迟导入避免循环引用 def get_user_model(): from models import User return User def get_dingtalk_config(): print('[后端] [1] ============== get_dingtalk_config 开始 ==============') app_key = current_app.config.get('DINGTALK_APP_KEY', '') agent_id = current_app.config.get('DINGTALK_AGENT_ID', '') corp_id = current_app.config.get('DINGTALK_CORP_ID', '') app_secret = current_app.config.get('DINGTALK_APP_SECRET', '') target_url = current_app.config.get('DINGTALK_TARGET_URL', 'http://localhost:5001/requirement-collection') print(f'[后端] [1] 配置读取: app_key={app_key}, agent_id={agent_id}, corp_id={corp_id}') if not app_key or not app_secret: print(f'[后端] [1] 缺少 app_key 或 app_secret, 返回 None') return None result = { 'app_key': app_key, 'app_secret': app_secret, 'agent_id': agent_id, 'corp_id': corp_id, 'target_url': target_url } print(f'[后端] [1] get_dingtalk_config 完成, 返回:', result) return result def get_access_token(): print('[后端] [2] ============== get_access_token 开始 ==============') app_key = current_app.config.get('DINGTALK_APP_KEY') app_secret = current_app.config.get('DINGTALK_APP_SECRET') if not app_key or not app_secret: print('[后端] [2] 缺少 app_key 或 app_secret, 返回 None') return None url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' headers = {'Content-Type': 'application/json'} data = {'appKey': app_key, 'appSecret': app_secret} print(f'[后端] [2] 请求钉钉API: {url}, data: appKey={app_key}') try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [2] 钉钉响应:', result) if result.get('accessToken'): print(f'[后端] [2] 获取access_token成功') return result['accessToken'] except Exception as e: print(f'[后端] [2] 异常:', e) return None def get_jsapi_ticket(): print('[后端] [2b] ============== get_jsapi_ticket 开始 ==============') app_key = current_app.config.get('DINGTALK_APP_KEY') app_secret = current_app.config.get('DINGTALK_APP_SECRET') if not app_key or not app_secret: print('[后端] [2b] 缺少 app_key 或 app_secret, 返回 None') return None url = f'https://oapi.dingtalk.com/gettoken?appkey={app_key}&appsecret={app_secret}' print(f'[后端] [2b] 请求钉钉API: {url}') try: response = requests.get(url, timeout=10) result = response.json() print(f'[后端] [2b] 获取token响应:', result) if result.get('access_token'): access_token = result['access_token'] ticket_url = f'https://oapi.dingtalk.com/get_jsapi_ticket?access_token={access_token}' print(f'[后端] [2b] 请求ticket API: {ticket_url}') ticket_response = requests.get(ticket_url, timeout=10) ticket_result = ticket_response.json() print(f'[后端] [2b] 获取ticket响应:', ticket_result) if ticket_result.get('ticket'): print(f'[后端] [2b] 获取jsapi_ticket成功') return ticket_result['ticket'] except Exception as e: print(f'[后端] [2b] 异常:', e) return None def generate_signature(jsapi_ticket, nonce_str, timestamp, url): print(f'[后端] [2c] ============== generate_signature 开始 ==============') print(f'[后端] [2c] 参数: jsapi_ticket={jsapi_ticket[:20] if jsapi_ticket else "None"}..., nonce_str={nonce_str}, timestamp={timestamp}, url={url}') sign_str = f'jsapi_ticket={jsapi_ticket}&noncestr={nonce_str}×tamp={timestamp}&url={url}' print(f'[后端] [2c] 签名字符串: {sign_str}') signature = hashlib.sha1(sign_str.encode('utf-8')).hexdigest() print(f'[后端] [2c] 签名结果: {signature}') return signature def get_user_info_by_code(code): print(f'[后端] [3] ============== get_user_info_by_code 开始, code={code} ==============') access_token = get_access_token() if not access_token: print('[后端] [3] 获取 access_token 失败, 返回 None') return None # 使用钉钉微应用API通过临时授权码获取用户信息 url = f'https://oapi.dingtalk.com/topapi/v2/user/getuserinfo?access_token={access_token}' print(f'[后端] [3] 请求钉钉API: {url}') headers = {'Content-Type': 'application/json'} data = {'code': code} try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [3] 钉钉用户信息响应:', result) return result except Exception as e: print(f'[后端] [3] 异常:', e) return None def get_user_detail(user_id): print(f'[后端] [4] ============== get_user_detail 开始, user_id={user_id} ==============') access_token = get_access_token() if not access_token: print('[后端] [4] 获取 access_token 失败, 返回 None') return None # 使用新版API获取用户详情 url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}' print(f'[后端] [4] 请求钉钉API: {url}') headers = {'Content-Type': 'application/json'} data = {'userid': user_id} try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [4] 钉钉用户详情响应:', result) return result except Exception as e: print(f'[后端] [4] 异常:', e) return None def get_dept_name(dept_id): print(f'[后端] [5] ============== get_dept_name 开始, dept_id={dept_id} ==============') access_token = get_access_token() if not access_token: print('[后端] [5] 获取 access_token 失败, 返回 None') return None # 使用新版API获取部门信息 url = f'https://oapi.dingtalk.com/topapi/v2/department/get?access_token={access_token}' print(f'[后端] [5] 请求钉钉API: {url}') headers = {'Content-Type': 'application/json'} data = {'dept_id': int(dept_id)} try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [5] 钉钉部门信息响应:', result) if result.get('errcode') == 0 and result.get('result') and result['result'].get('name'): print(f'[后端] [5] 获取部门名称成功: {result["result"]["name"]}') return result['result']['name'] except Exception as e: print(f'[后端] [5] 异常:', e) return None def send_dingtalk_card_to_user(user_id, title, content, jump_url): """发送钉钉消息卡片给指定用户""" print(f'[后端] [N] ============== send_dingtalk_card_to_user 开始, user_id={user_id} ==============') access_token = get_access_token() if not access_token: print('[后端] [N] 获取 access_token 失败') return False url = f'https://oapi.dingtalk.com/topapi/message/corpconversation/asyncsend_v2?access_token={access_token}' headers = {'Content-Type': 'application/json'} agent_id = current_app.config.get('DINGTALK_AGENT_ID', '') data = { 'agent_id': int(agent_id) if agent_id else 0, 'userid_list': user_id, 'msg': { 'msgtype': 'link', 'link': { 'title': title, 'text': content, 'picUrl': 'https://img.alicdn.com/tfs/TB1NwmBkeL2gK0jSZPhXXahviXa-72-72.png', 'messageUrl': jump_url } } } print(f'[后端] [N] 发送消息卡片: {data}') try: response = requests.post(url, json=data, headers=headers, timeout=10) result = response.json() print(f'[后端] [N] 钉钉响应:', result) if result.get('errcode') == 0: print(f'[后端] [N] 消息发送成功') return True else: print(f'[后端] [N] 消息发送失败: {result.get("errmsg", "未知错误")}') return False except Exception as e: print(f'[后端] [N] 异常:', e) return False def notify_admins_new_demand(demand, base_url): """通知所有管理员有新需求提交""" print(f'[后端] [N] ============== notify_admins_new_demand 开始 ==============') # 获取所有管理员用户 from models import User admins = User.query.filter_by(role='admin').all() print(f'[后端] [N] 找到 {len(admins)} 个管理员') if not admins: print('[后端] [N] 没有管理员用户,跳过通知') return # 构建消息内容 title = f'📋 新需求提交:{demand.title}' content = f'''需求标题:{demand.title} 提交者:{demand.contact or "未知"} 分会:{demand.branch} 点击查看并回答'''.strip() # 回答页面的跳转链接 jump_url = f'{base_url}/demand/{demand.id}/answer' success_count = 0 for admin in admins: if admin.dingtalk_userid: if send_dingtalk_card_to_user(admin.dingtalk_userid, title, content, jump_url): success_count += 1 print(f'[后端] [N] 消息发送完成,成功: {success_count}/{len(admins)}') def notify_asker_answer(demand, base_url): """通知提问者问题已被回答""" print(f'[后端] [N] ============== notify_asker_answer 开始 ==============') # 获取提问者 User = get_user_model() asker = User.query.get(demand.user_id) if not asker: print('[后端] [N] 未找到提问者,跳过通知') return if not asker.dingtalk_userid: print(f'[后端] [N] 提问者 {asker.username} 没有钉钉userid,跳过通知') return # 构建消息内容 title = f'✅ 您的需求已被回答:{demand.title}' content = f'''需求标题:{demand.title} 管理员已回复您的问题,点击查看详情'''.strip() # 跳转链接 - 指向首页 jump_url = f'{base_url}' + '/dingtalk' print(f'[后端] [N] 准备通知提问者: {asker.username}, dingtalk_userid: {asker.dingtalk_userid}') if send_dingtalk_card_to_user(asker.dingtalk_userid, title, content, jump_url): print(f'[后端] [N] 提问者通知发送成功') else: print(f'[后端] [N] 提问者通知发送失败') @dingtalk_bp.route('/') def dingtalk_entry(): print('[后端] [0] ============== dingtalk_entry 请求到达 ==============') config = get_dingtalk_config() if not config: print('[后端] [0] 配置为空,使用默认值') return render_template('dingtalk_entry.html', app_key='', agent_id='', corp_id='', target_url='http://localhost:5001/requirement-collection' ), 400 print('[后端] [0] 渲染 dingtalk_entry.html') return render_template('dingtalk_entry.html', app_key=config['app_key'], agent_id=config['agent_id'], corp_id=config['corp_id'], target_url=config['target_url'] ) @dingtalk_bp.route('/api/getSignature') def get_signature(): print('[后端] [S] ============== get_signature 请求到达 ==============') url = request.args.get('url', '') print(f'[后端] [S] 收到前端传来的URL: {url}') if not url: print('[后端] [S] 缺少url参数') return jsonify({'error': '缺少url参数'}), 400 config = get_dingtalk_config() if not config: print('[后端] [S] 配置为空') return jsonify({'error': '配置为空'}), 400 jsapi_ticket = get_jsapi_ticket() if not jsapi_ticket: print('[后端] [S] 获取jsapi_ticket失败') return jsonify({'error': '获取jsapi_ticket失败'}), 500 nonce_str = 'dingtalk' + str(int(time.time())) timestamp = str(int(time.time() * 1000)) signature = generate_signature(jsapi_ticket, nonce_str, timestamp, url) print(f'[后端] [S] ============== get_signature 完成 ==============') return jsonify({ 'signature': signature, 'nonceStr': nonce_str, 'timeStamp': timestamp, 'agentId': config['agent_id'] and int(config['agent_id']) or 0 }) @dingtalk_bp.route('/api/getDingUser') def get_ding_user(): print('[后端] [6] ============== get_ding_user 请求到达 ==============') code = request.args.get('code') print(f'[后端] [6] 收到 code 参数: {code}') if not code: print('[后端] [6] 缺少 code 参数') return jsonify({'errcode': 400, 'errmsg': '缺少code参数'}) print('[后端] [6] 开始调用 get_user_info_by_code') user_info = get_user_info_by_code(code) if not user_info: print('[后端] [6] 获取用户信息失败') return jsonify({'errcode': 500, 'errmsg': '获取用户信息失败'}) # 处理新的API响应格式 result = { 'errcode': 0, 'errmsg': 'ok', 'userid': '', 'name': '', 'department': [] } if user_info.get('errcode') == 0 and user_info.get('result'): result['userid'] = user_info['result'].get('userid', '') else: # 兼容旧格式 result['userid'] = user_info.get('userId', user_info.get('userid', '')) print(f'[后端] [6] 初步结果: {result}') if result['userid']: print('[后端] [6] 有 userid, 调用 get_user_detail 获取详情') user_detail = get_user_detail(result['userid']) if user_detail and user_detail.get('errcode') == 0 and user_detail.get('result'): result['name'] = user_detail['result'].get('name', '') dept_ids = user_detail['result'].get('dept_id_list', []) print(f'[后端] [6] 更新用户名为: {result["name"]}') print(f'[后端] [6] 部门ID列表: {dept_ids}') # 获取部门名称 if dept_ids: print('[后端] [6] 开始获取部门名称') dept_names = [] for dept_id in dept_ids: name = get_dept_name(str(dept_id)) if name: dept_names.append(name) if dept_names: result['department'] = dept_names print(f'[后端] [6] 更新部门为: {result["department"]}') elif user_detail: # 兼容旧格式 result['name'] = user_detail.get('name', '') dept_ids = user_detail.get('department', []) if dept_ids: dept_names = [] for dept_id in dept_ids: name = get_dept_name(str(dept_id)) if name: dept_names.append(name) if dept_names: result['department'] = dept_names print(f'[后端] [6] ============== get_ding_user 完成, 返回: {result} ==============') return jsonify(result)