一元网络论坛

 找回密码
 立即注册
搜索
热搜: 活动 交友 discuz
查看: 58|回复: 0

asyncio 速度比 ThreadPool 慢?DeepL Pro 实测告诉你原因。

[复制链接]

1万

主题

1万

帖子

5万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
56586
发表于 2024-8-27 02:59:26 | 显示全部楼层 |阅读模式
```python
"""Translate using deeplpro's dl_session cookie."""
# pylint: disable=broad-exception-caught
import asyncio
import sys
from time import monotonic
from random import randrange
from typing import Union
from concurrent.futures import ThreadPoolExecutor
import httpx
from loadtext import loadtext
COOKIE = "dl_session=81f94b98-497f-4f18-ab79-1ef25076f4c9"
HEADERS = {"Content-Type": "application/json", "Cookie": COOKIE}
URL = "https://api.deepl.com/jsonrpc"
def deepl_tr(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Union[dict, Exception]:
    """Translate using deeplpro's dl_session cookie."""
    data = {
        "jsonrpc": "2.0",
        "method": "LMT_handle_texts",
        "id": randrange(sys.maxsize),
        "params": {
            "splitting": "newlines",
            "lang": {"source_lang_user_selected": source_lang, "target_lang": target_lang},
            "texts": [{"text": text, "requestAlternatives": 3}],
        },
    }
    try:
        _ = httpx.post(URL, json=data, headers=HEADERS)
        jdata = _.json()
        return jdata
    except Exception as exc:
        return exc
async def deepl_tr_async(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Union[dict, Exception]:
    """Translate using deeplpro's dl_session cookie."""
    data = {
        "jsonrpc": "2.0",
        "method": "LMT_handle_texts",
        "id": randrange(sys.maxsize),
        "params": {
            "splitting": "newlines",
            "lang": {"source_lang_user_selected": source_lang, "target_lang": target_lang},
            "texts": [{"text": text, "requestAlternatives": 3}],
        },
    }
    async with httpx.AsyncClient() as client:
        try:
            _ = await client.post(URL, json=data, headers=HEADERS)
            jdata = _.json()
            return jdata
        except Exception as exc:
            return exc
def main():
    """Run."""
    texts = loadtext(r"2024-08-20.txt")
    then = monotonic()
    with ThreadPoolExecutor() as executor:
        _ = executor.map(deepl_tr, texts)
    print(*_)
    time_el = monotonic() - then
    print(f"{len(texts)}, {time_el:.2f} {time_el / len(texts):.2f}")
async def main_a():
    """Run async."""
    texts = loadtext(r"2024-08-20.txt")
    then = monotonic()
    coros = [deepl_tr_async(text) for text in texts]
    _ = await asyncio.gather(*coros)
    print(_)
    time_el = monotonic() - then
    print(f"{len(texts)}, {time_el:.2f} {time_el / len(texts):.2f}")
if __name__ == "__main__":
    asyncio.run(main_a())
    main()
```
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Archiver|手机版|小黑屋|一元网络论坛

GMT+8, 2024-9-20 14:34 , Processed in 0.073574 second(s), 20 queries .

Powered by Discuz! X3.4

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表