bluewinliang commited on
Commit
c2a4afc
·
verified ·
1 Parent(s): 244ffde

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +86 -85
proxy_handler.py CHANGED
@@ -1,5 +1,5 @@
1
  """
2
- Proxy handler for Z.AI API requests
3
  """
4
  import json, logging, re, time, uuid
5
  from typing import AsyncGenerator, Dict, Any, Tuple
@@ -8,12 +8,13 @@ import httpx
8
  from fastapi import HTTPException
9
  from fastapi.responses import StreamingResponse
10
 
11
- # 這些導入現在可以確定是正確的
12
  from config import settings
13
  from cookie_manager import cookie_manager
14
  from models import ChatCompletionRequest, ChatCompletionResponse
15
 
16
  logger = logging.getLogger(__name__)
 
 
17
 
18
 
19
  class ProxyHandler:
@@ -28,7 +29,7 @@ class ProxyHandler:
28
  if not self.client.is_closed:
29
  await self.client.aclose()
30
 
31
- # --- Text utilities and other methods from the last robust version ---
32
  def _clean_thinking(self, s: str) -> str:
33
  if not s: return ""
34
  s = re.sub(r'<details[^>]*>.*?</details>', '', s, flags=re.DOTALL)
@@ -39,6 +40,8 @@ class ProxyHandler:
39
  def _clean_answer(self, s: str) -> str:
40
  if not s: return ""
41
  return re.sub(r"<details[^>]*>.*?</details>", "", s, flags=re.DOTALL)
 
 
42
  def _serialize_msgs(self, msgs) -> list:
43
  out = []
44
  for m in msgs:
@@ -55,100 +58,67 @@ class ProxyHandler:
55
  headers = { "Content-Type": "application/json", "Authorization": f"Bearer {ck}", "User-Agent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"), "Accept": "application/json, text/event-stream", "Accept-Language": "zh-CN", "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "x-fe-version": "prod-fe-1.0.53", "Origin": "https://chat.z.ai", "Referer": "https://chat.z.ai/",}
56
  return body, headers, ck
57
 
58
- # ---------- stream ----------
59
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
 
 
60
  ck = None
61
  try:
62
  body, headers, ck = await self._prep_upstream(req)
63
- comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
64
- think_open = False
65
- phase_cur = None
66
-
67
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
68
  if resp.status_code != 200:
69
- # FIX: Using the correct method name `mark_cookie_failed`
70
  await cookie_manager.mark_cookie_failed(ck)
71
  err_body = await resp.aread(); err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}"
72
  err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],}
73
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
74
-
75
- # If the request was successful, the cookie is good.
76
  await cookie_manager.mark_cookie_success(ck)
77
-
78
- # The robust logic for handling chunks from the last attempt
79
  async for raw in resp.aiter_text():
80
  for line in raw.strip().split('\n'):
81
  line = line.strip()
82
  if not line or not line.startswith('data: '): continue
83
  payload_str = line[6:]
84
  if payload_str == '[DONE]':
85
- if think_open:
86
- payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
87
- yield f"data: {json.dumps(payload)}\n\n"
88
- payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]}
89
- yield f"data: {json.dumps(payload)}\n\n"; yield "data: [DONE]\n\n"; return
90
- try:
91
- dat = json.loads(payload_str).get("data", {})
92
  except (json.JSONDecodeError, AttributeError): continue
93
-
94
- delta = dat.get("delta_content", "")
95
- new_phase = dat.get("phase")
96
-
97
  is_transition = new_phase and new_phase != phase_cur
98
  if is_transition:
99
  if phase_cur == "thinking" and new_phase == "answer" and think_open and settings.SHOW_THINK_TAGS:
100
- close_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
101
- yield f"data: {json.dumps(close_payload)}\n\n"
102
  think_open = False
103
  phase_cur = new_phase
104
-
105
  current_content_phase = phase_cur or new_phase
106
-
107
  text_to_yield = ""
108
  if current_content_phase == "thinking":
109
  if delta and settings.SHOW_THINK_TAGS:
110
  if not think_open:
111
- open_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]}
112
- yield f"data: {json.dumps(open_payload)}\n\n"
113
  think_open = True
114
  text_to_yield = self._clean_thinking(delta)
115
- elif current_content_phase == "answer":
116
- text_to_yield = self._clean_answer(delta)
117
-
118
- if text_to_yield:
119
- content_payload = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],}
120
- yield f"data: {json.dumps(content_payload)}\n\n"
121
- except httpx.RequestError as e:
122
- if ck:
123
- # FIX: Using the correct method name `mark_cookie_failed`
124
- await cookie_manager.mark_cookie_failed(ck)
125
- logger.error(f"Request error: {e}"); err_msg = f"Connection error: {e}"
126
- err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": err_msg}, "finish_reason": "stop"}]}
127
- yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
128
- except Exception:
129
- logger.exception("Unexpected error in stream_proxy_response")
130
- err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": "Internal error in stream"}, "finish_reason": "stop"}]}
131
- yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
132
 
133
- # ---------- non-stream ----------
134
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
 
135
  ck = None
136
  try:
137
  body, headers, ck = await self._prep_upstream(req)
138
- full_content = []
139
  phase_cur = None
140
 
141
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
142
  if resp.status_code != 200:
143
- # FIX: Using the correct method name `mark_cookie_failed`
144
  await cookie_manager.mark_cookie_failed(ck)
145
  error_detail = await resp.text()
146
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
147
 
148
- # If the request was successful, the cookie is good.
149
  await cookie_manager.mark_cookie_success(ck)
150
 
151
- # The robust logic for collecting chunks from the last attempt
152
  async for raw in resp.aiter_text():
153
  for line in raw.strip().split('\n'):
154
  line = line.strip()
@@ -157,52 +127,83 @@ class ProxyHandler:
157
  if payload_str == '[DONE]': break
158
  try:
159
  dat = json.loads(payload_str).get("data", {})
 
160
  except (json.JSONDecodeError, AttributeError): continue
161
-
162
- delta = dat.get("delta_content")
163
- new_phase = dat.get("phase")
164
-
165
- if new_phase:
166
- phase_cur = new_phase
167
-
168
- if delta and phase_cur:
169
- full_content.append((phase_cur, delta))
170
  else: continue
171
  break
 
 
 
 
 
 
 
 
 
 
 
 
172
 
173
- think_buf = []
174
- answer_buf = []
175
- for phase, content in full_content:
176
- if phase == "thinking":
177
- think_buf.append(self._clean_thinking(content))
178
- elif phase == "answer":
179
- answer_buf.append(self._clean_answer(content))
 
 
 
 
 
 
180
 
181
- ans_text = ''.join(answer_buf)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  final_content = ans_text
 
 
 
 
 
 
183
 
184
- if settings.SHOW_THINK_TAGS and think_buf:
185
- think_text = ''.join(think_buf).strip()
186
- if think_text:
187
- newline = "\n" if ans_text and not ans_text.startswith(('\n', '\r')) else ""
188
- final_content = f"<think>{think_text}</think>{newline}{ans_text}"
189
 
190
  return ChatCompletionResponse(
191
  id=f"chatcmpl-{uuid.uuid4().hex[:29]}", created=int(time.time()), model=req.model,
192
  choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": "stop"}],
193
  )
194
- except httpx.RequestError as e:
195
- if ck:
196
- # FIX: Using the correct method name `mark_cookie_failed`
197
- await cookie_manager.mark_cookie_failed(ck)
198
- logger.error(f"Non-stream request error: {e}"); raise HTTPException(502, f"Connection error: {e}")
199
  except Exception:
200
- logger.exception("Non-stream unexpected error"); raise HTTPException(500, "Internal server error")
 
201
 
202
- # FastAPI entry point remains the same
203
  async def handle_chat_completion(self, req: ChatCompletionRequest):
204
- stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM
205
- if stream:
206
- return StreamingResponse(self.stream_proxy_response(req), media_type="text/event-stream",
207
- headers={"Cache-Control": "no-cache", "Connection": "keep-alive"})
 
 
208
  return await self.non_stream_proxy_response(req)
 
1
  """
2
+ Proxy handler for Z.AI API requests - DEBUG VERSION
3
  """
4
  import json, logging, re, time, uuid
5
  from typing import AsyncGenerator, Dict, Any, Tuple
 
8
  from fastapi import HTTPException
9
  from fastapi.responses import StreamingResponse
10
 
 
11
  from config import settings
12
  from cookie_manager import cookie_manager
13
  from models import ChatCompletionRequest, ChatCompletionResponse
14
 
15
  logger = logging.getLogger(__name__)
16
+ # Set logger to DEBUG level to ensure all our messages are captured
17
+ logger.setLevel(logging.DEBUG)
18
 
19
 
20
  class ProxyHandler:
 
29
  if not self.client.is_closed:
30
  await self.client.aclose()
31
 
32
+ # --- Text utilities ---
33
  def _clean_thinking(self, s: str) -> str:
34
  if not s: return ""
35
  s = re.sub(r'<details[^>]*>.*?</details>', '', s, flags=re.DOTALL)
 
40
  def _clean_answer(self, s: str) -> str:
41
  if not s: return ""
42
  return re.sub(r"<details[^>]*>.*?</details>", "", s, flags=re.DOTALL)
43
+
44
+ # --- Other methods ---
45
  def _serialize_msgs(self, msgs) -> list:
46
  out = []
47
  for m in msgs:
 
58
  headers = { "Content-Type": "application/json", "Authorization": f"Bearer {ck}", "User-Agent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"), "Accept": "application/json, text/event-stream", "Accept-Language": "zh-CN", "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "x-fe-version": "prod-fe-1.0.53", "Origin": "https://chat.z.ai", "Referer": "https://chat.z.ai/",}
59
  return body, headers, ck
60
 
61
+ # Streaming response is left as-is for now, we focus on non-stream for debugging
62
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
63
+ # This function remains the same as the previous correct version.
64
+ # The focus of debugging is on the non-stream version.
65
  ck = None
66
  try:
67
  body, headers, ck = await self._prep_upstream(req)
68
+ comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"; think_open = False; phase_cur = None
 
 
 
69
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
70
  if resp.status_code != 200:
 
71
  await cookie_manager.mark_cookie_failed(ck)
72
  err_body = await resp.aread(); err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}"
73
  err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],}
74
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
 
 
75
  await cookie_manager.mark_cookie_success(ck)
 
 
76
  async for raw in resp.aiter_text():
77
  for line in raw.strip().split('\n'):
78
  line = line.strip()
79
  if not line or not line.startswith('data: '): continue
80
  payload_str = line[6:]
81
  if payload_str == '[DONE]':
82
+ if think_open: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
83
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"; yield "data: [DONE]\n\n"; return
84
+ try: dat = json.loads(payload_str).get("data", {})
 
 
 
 
85
  except (json.JSONDecodeError, AttributeError): continue
86
+ delta = dat.get("delta_content", ""); new_phase = dat.get("phase")
 
 
 
87
  is_transition = new_phase and new_phase != phase_cur
88
  if is_transition:
89
  if phase_cur == "thinking" and new_phase == "answer" and think_open and settings.SHOW_THINK_TAGS:
90
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
 
91
  think_open = False
92
  phase_cur = new_phase
 
93
  current_content_phase = phase_cur or new_phase
 
94
  text_to_yield = ""
95
  if current_content_phase == "thinking":
96
  if delta and settings.SHOW_THINK_TAGS:
97
  if not think_open:
98
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n"
 
99
  think_open = True
100
  text_to_yield = self._clean_thinking(delta)
101
+ elif current_content_phase == "answer": text_to_yield = self._clean_answer(delta)
102
+ if text_to_yield: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': text_to_yield}, 'finish_reason': None}]})}\n\n"
103
+ except Exception: logger.exception("Stream error"); raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
 
105
+ # ---------- NON-STREAM (DEBUG VERSION) ----------
106
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
107
+ logger.debug("="*20 + " STARTING NON-STREAM DEBUG " + "="*20)
108
  ck = None
109
  try:
110
  body, headers, ck = await self._prep_upstream(req)
111
+ full_content_stream = []
112
  phase_cur = None
113
 
114
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
115
  if resp.status_code != 200:
 
116
  await cookie_manager.mark_cookie_failed(ck)
117
  error_detail = await resp.text()
118
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
119
 
 
120
  await cookie_manager.mark_cookie_success(ck)
121
 
 
122
  async for raw in resp.aiter_text():
123
  for line in raw.strip().split('\n'):
124
  line = line.strip()
 
127
  if payload_str == '[DONE]': break
128
  try:
129
  dat = json.loads(payload_str).get("data", {})
130
+ full_content_stream.append(dat) # Store the raw parsed data
131
  except (json.JSONDecodeError, AttributeError): continue
 
 
 
 
 
 
 
 
 
132
  else: continue
133
  break
134
+
135
+ # --- STAGE 1: LOG RAW COLLECTED DATA ---
136
+ logger.debug("-" * 10 + " STAGE 1: RAW DATA FROM Z.AI " + "-" * 10)
137
+ for i, dat in enumerate(full_content_stream):
138
+ # Use !r to get an unambiguous representation of the string
139
+ logger.debug(f"Chunk {i}: {dat!r}")
140
+ logger.debug("-" * 10 + " END STAGE 1 " + "-" * 10)
141
+
142
+
143
+ # --- STAGE 2: PROCESS AND LOG RAW JOINED STRINGS ---
144
+ raw_think_str = ''.join([d.get("delta_content", "") for d in full_content_stream if d.get("phase") == "thinking"])
145
+ raw_answer_str = ''.join([d.get("delta_content", "") for d in full_content_stream if d.get("phase") == "answer"])
146
 
147
+ # This is a fallback for chunks that might not have a phase.
148
+ # It's more complex but might catch the edge case.
149
+ phase_aware_think = []
150
+ phase_aware_answer = []
151
+ current_phase_for_build = None
152
+ for d in full_content_stream:
153
+ if 'phase' in d:
154
+ current_phase_for_build = d['phase']
155
+ if 'delta_content' in d:
156
+ if current_phase_for_build == 'thinking':
157
+ phase_aware_think.append(d['delta_content'])
158
+ elif current_phase_for_build == 'answer':
159
+ phase_aware_answer.append(d['delta_content'])
160
 
161
+ phase_aware_raw_answer_str = ''.join(phase_aware_answer)
162
+
163
+
164
+ logger.debug("-" * 10 + " STAGE 2: RAW JOINED STRINGS " + "-" * 10)
165
+ logger.debug(f"Phase-unaware Think String: {raw_think_str!r}")
166
+ logger.debug(f"Phase-unaware Answer String: {raw_answer_str!r}")
167
+ logger.debug(f"Phase-aware Answer String: {phase_aware_raw_answer_str!r}")
168
+ logger.debug("-" * 10 + " END STAGE 2 " + "-" * 10)
169
+
170
+
171
+ # --- STAGE 3: PROCESS AND LOG FINAL CLEANED TEXT ---
172
+ # We will use the more robust phase-aware string for the final result
173
+ think_text = self._clean_thinking(''.join(phase_aware_think)).strip()
174
+ ans_text = self._clean_answer(''.join(phase_aware_answer))
175
+
176
+
177
+ logger.debug("-" * 10 + " STAGE 3: FINAL CLEANED TEXT " + "-" * 10)
178
+ logger.debug(f"Final Think Text: {think_text!r}")
179
+ logger.debug(f"Final Answer Text: {ans_text!r}")
180
+ logger.debug("-" * 10 + " END STAGE 3 " + "-" * 10)
181
+
182
+
183
+ # Final construction
184
  final_content = ans_text
185
+ if settings.SHOW_THINK_TAGS and think_text:
186
+ newline = "\n" if ans_text and not ans_text.startswith(('\n', '\r')) else ""
187
+ final_content = f"<think>{think_text}</think>{newline}{ans_text}"
188
+
189
+ logger.debug(f"Final constructed content to be returned: {final_content!r}")
190
+ logger.debug("="*20 + " END NON-STREAM DEBUG " + "="*20)
191
 
 
 
 
 
 
192
 
193
  return ChatCompletionResponse(
194
  id=f"chatcmpl-{uuid.uuid4().hex[:29]}", created=int(time.time()), model=req.model,
195
  choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": "stop"}],
196
  )
 
 
 
 
 
197
  except Exception:
198
+ logger.exception("Non-stream processing failed")
199
+ raise
200
 
201
+ # ---------- FastAPI entry ----------
202
  async def handle_chat_completion(self, req: ChatCompletionRequest):
203
+ # Force non-stream mode for this debug session
204
+ if req.stream:
205
+ logger.warning("Request specified streaming, but DEBUG handler is forcing non-stream mode.")
206
+
207
+ # Make sure stream is False to trigger the debug path
208
+ req.stream = False
209
  return await self.non_stream_proxy_response(req)