HEX
Server: Apache/2.4.65 (Ubuntu)
System: Linux ielts-store-v2 6.8.0-1036-gcp #38~22.04.1-Ubuntu SMP Thu Aug 14 01:19:18 UTC 2025 x86_64
User: root (0)
PHP: 7.2.34-54+ubuntu20.04.1+deb.sury.org+1
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,
Upload Files
File: //snap/google-cloud-cli/396/platform/gsutil/third_party/urllib3/dummyserver/asgi_proxy.py
from __future__ import annotations

import typing

import httpx
import trio
from hypercorn.typing import (
    ASGIReceiveCallable,
    ASGISendCallable,
    HTTPResponseBodyEvent,
    HTTPResponseStartEvent,
    HTTPScope,
    Scope,
)


async def _read_body(receive: ASGIReceiveCallable) -> bytes:
    body = bytearray()
    body_consumed = False
    while not body_consumed:
        event = await receive()
        if event["type"] == "http.request":
            body.extend(event["body"])
            body_consumed = not event["more_body"]
        else:
            raise ValueError(event["type"])
    return bytes(body)


class ProxyApp:
    def __init__(self, upstream_ca_certs: str | None = None):
        self.upstream_ca_certs = upstream_ca_certs

    async def __call__(
        self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable
    ) -> None:
        assert scope["type"] == "http"
        if scope["method"] in ["GET", "POST"]:
            await self.absolute_uri(scope, receive, send)
        elif scope["method"] == "CONNECT":
            await self.connect(scope, send)
        else:
            raise ValueError(scope["method"])

    async def absolute_uri(
        self,
        scope: HTTPScope,
        receive: ASGIReceiveCallable,
        send: ASGISendCallable,
    ) -> None:
        async with httpx.AsyncClient(verify=self.upstream_ca_certs or True) as client:
            client_response = await client.request(
                method=scope["method"],
                url=scope["path"],
                headers=list(scope["headers"]),
                content=await _read_body(receive),
            )

        headers = []
        for header in (
            "Date",
            "Cache-Control",
            "Server",
            "Content-Type",
            "Location",
        ):
            v = client_response.headers.get(header)
            if v:
                headers.append((header.encode(), v.encode()))
        headers.append((b"Content-Length", str(len(client_response.content)).encode()))

        await send(
            HTTPResponseStartEvent(
                type="http.response.start",
                status=client_response.status_code,
                headers=headers,
            )
        )
        await send(
            HTTPResponseBodyEvent(
                type="http.response.body",
                body=client_response.content,
                more_body=False,
            )
        )

    async def connect(self, scope: HTTPScope, send: ASGISendCallable) -> None:
        async def start_forward(
            reader: trio.SocketStream, writer: trio.SocketStream
        ) -> None:
            while True:
                try:
                    data = await reader.receive_some(4096)
                except trio.ClosedResourceError:
                    break
                if not data:
                    break
                await writer.send_all(data)
            await writer.aclose()

        host, port = scope["path"].split(":")
        async with await trio.open_tcp_stream(host, int(port)) as upstream:
            await send({"type": "http.response.start", "status": 200, "headers": []})
            await send({"type": "http.response.body", "body": b"", "more_body": True})

            client = typing.cast(trio.SocketStream, scope["extensions"]["_transport"])

            async with trio.open_nursery(strict_exception_groups=True) as nursery:
                nursery.start_soon(start_forward, client, upstream)
                nursery.start_soon(start_forward, upstream, client)