|
1 | 1 | import json |
2 | 2 | import os |
3 | 3 | import re |
| 4 | +from typing import ClassVar |
4 | 5 |
|
5 | 6 | from nazurin.models import Caption |
6 | | -from nazurin.utils import Request |
| 7 | +from nazurin.utils import Request, logger |
7 | 8 | from nazurin.utils.decorators import network_retry |
8 | 9 | from nazurin.utils.exceptions import NazurinError |
9 | 10 | from nazurin.utils.helpers import fromasctimeformat |
|
13 | 14 |
|
14 | 15 |
|
15 | 16 | class Weibo: |
| 17 | + tid: ClassVar[str] = "" |
| 18 | + cookies: ClassVar[dict] = {} |
| 19 | + |
| 20 | + VISITOR_SYSTEM_SUCCESS_CODE: ClassVar[int] = 20000000 |
| 21 | + |
16 | 22 | @network_retry |
17 | 23 | async def get_post(self, post_id: str): |
18 | 24 | """Fetch a post.""" |
19 | | - api = f"https://m.weibo.cn/detail/{post_id}" |
20 | | - async with Request() as request, request.get(api) as response: |
21 | | - response.raise_for_status() |
22 | | - html = await response.text() |
| 25 | + post_url = f"https://m.weibo.cn/detail/{post_id}" |
| 26 | + async with ( |
| 27 | + Request(cookies=Weibo.cookies) as request, |
| 28 | + request.get(post_url) as response, |
| 29 | + ): |
| 30 | + if response.url.host == "visitor.passport.weibo.cn": |
| 31 | + logger.info("Handling Weibo visitor system: {}", response.url) |
| 32 | + await self._handle_visitor_system(post_url, str(response.url)) |
| 33 | + async with ( |
| 34 | + Request(cookies=Weibo.cookies) as new_request, |
| 35 | + new_request.get(post_url) as new_response, |
| 36 | + ): |
| 37 | + new_response.raise_for_status() |
| 38 | + html = await new_response.text() |
| 39 | + else: |
| 40 | + response.raise_for_status() |
| 41 | + html = await response.text() |
23 | 42 | post = self.parse_html(html) |
24 | 43 | return post |
25 | 44 |
|
@@ -170,3 +189,57 @@ def parse_html(html) -> dict: |
170 | 189 | except json.JSONDecodeError: |
171 | 190 | raise NazurinError("Failed to parse post data") from None |
172 | 191 | return post |
| 192 | + |
| 193 | + async def _handle_visitor_system( |
| 194 | + self, post_url: str, visitor_system_url: str |
| 195 | + ) -> None: |
| 196 | + token_url = "https://visitor.passport.weibo.cn/visitor/genvisitor2" |
| 197 | + payload = { |
| 198 | + "cb": "visitor_gray_callback", |
| 199 | + "ver": "20250916", |
| 200 | + "tid": Weibo.tid, |
| 201 | + "from": "weibo", |
| 202 | + "webdriver": "false", |
| 203 | + "return_url": post_url, |
| 204 | + } |
| 205 | + async with ( |
| 206 | + Request( |
| 207 | + headers={ |
| 208 | + "Referer": visitor_system_url, |
| 209 | + } |
| 210 | + ) as request, |
| 211 | + request.post(token_url, data=payload) as response, |
| 212 | + ): |
| 213 | + if not response.ok: |
| 214 | + message = ( |
| 215 | + "Weibo visitor system token request failed " |
| 216 | + f"with status code {response.status}" |
| 217 | + ) |
| 218 | + logger.error( |
| 219 | + "{}: {}", |
| 220 | + message, |
| 221 | + await response.text(), |
| 222 | + ) |
| 223 | + raise NazurinError(message) |
| 224 | + match = re.search( |
| 225 | + r"visitor_gray_callback\((.*?)\);", |
| 226 | + await response.text(), |
| 227 | + re.MULTILINE, |
| 228 | + ) |
| 229 | + try: |
| 230 | + data = json.loads(match.group(1)) if match else None |
| 231 | + except json.JSONDecodeError: |
| 232 | + data = None |
| 233 | + if not data or data.get("retcode") != Weibo.VISITOR_SYSTEM_SUCCESS_CODE: |
| 234 | + logger.error( |
| 235 | + "Failed to handle Weibo visitor system, response: {}", |
| 236 | + await response.text(), |
| 237 | + ) |
| 238 | + raise NazurinError("Failed to get token from Weibo visitor system") |
| 239 | + Weibo.tid = data["data"]["tid"] |
| 240 | + Weibo.cookies = { |
| 241 | + "tid": Weibo.tid, |
| 242 | + "SUB": data["data"]["sub"], |
| 243 | + "SUBP": data["data"]["subp"], |
| 244 | + } |
| 245 | + logger.info("Successfully got token from Weibo visitor system") |
0 commit comments