Files
publicpaste/publicpaste/web.py
T
尘曲 41508762ad Preserve click gesture for copy fallback
Use a synchronous paste request so the HTTP-compatible copy fallback can run inside the original button click when browser clipboard APIs are unavailable.
2026-05-13 01:45:34 +08:00

390 lines
14 KiB
Python

"""Small standard-library web service for publicpaste."""
from __future__ import annotations
import json
import os
import sys
import threading
import time
from dataclasses import dataclass
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from .core import PasteError, paste_text
MAX_REQUEST_BYTES = 1024 * 1024
@dataclass(frozen=True)
class Settings:
host: str
port: int
data_dir: Path
timeout: float
overall_timeout: float
verify: bool
def _env_float(name: str, default: float) -> float:
value = os.environ.get(name)
if value is None or value == "":
return default
try:
parsed = float(value)
except ValueError as exc:
raise ValueError(f"{name} must be a number") from exc
if parsed <= 0:
raise ValueError(f"{name} must be positive")
return parsed
def _env_int(name: str, default: int) -> int:
value = os.environ.get(name)
if value is None or value == "":
return default
try:
parsed = int(value)
except ValueError as exc:
raise ValueError(f"{name} must be an integer") from exc
if parsed <= 0:
raise ValueError(f"{name} must be positive")
return parsed
def _env_bool(name: str, default: bool) -> bool:
value = os.environ.get(name)
if value is None or value == "":
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def load_settings() -> Settings:
return Settings(
host=os.environ.get("PUBLICPASTE_HOST") or "0.0.0.0",
port=_env_int("PUBLICPASTE_PORT", 12072),
data_dir=Path(os.environ.get("PUBLICPASTE_DATA_DIR") or "/data"),
timeout=_env_float("PUBLICPASTE_TIMEOUT", 8.0),
overall_timeout=_env_float("PUBLICPASTE_OVERALL_TIMEOUT", 12.0),
verify=_env_bool("PUBLICPASTE_VERIFY", False),
)
class PublicPasteServer(ThreadingHTTPServer):
settings: Settings
log_lock: threading.Lock
def __init__(self, server_address: tuple[str, int], settings: Settings) -> None:
super().__init__(server_address, PublicPasteHandler)
self.settings = settings
self.log_lock = threading.Lock()
@property
def log_path(self) -> Path:
return self.settings.data_dir / "publicpaste.log"
def append_api_log(self, entry: dict[str, Any]) -> None:
entry.setdefault("time", time.strftime("%Y-%m-%dT%H:%M:%S%z"))
line = json.dumps(entry, ensure_ascii=False, sort_keys=True)
with self.log_lock:
with self.log_path.open("a", encoding="utf-8") as log_file:
log_file.write(line + "\n")
class PublicPasteHandler(BaseHTTPRequestHandler):
server: PublicPasteServer
def do_GET(self) -> None: # noqa: N802 - BaseHTTPRequestHandler API
if self.path == "/" or self.path.startswith("/?"):
self._send_html(INDEX_HTML)
return
if self.path == "/health" or self.path.startswith("/health?"):
self._send_json(HTTPStatus.OK, {"ok": True, "service": "publicpaste"})
return
self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"})
def do_POST(self) -> None: # noqa: N802 - BaseHTTPRequestHandler API
if self.path != "/api/paste":
self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"})
return
started = time.monotonic()
try:
payload = self._read_json_body()
text = payload.get("text") if isinstance(payload, dict) else None
if not isinstance(text, str) or text.strip() == "":
raise ClientError(HTTPStatus.BAD_REQUEST, "text is required")
result = paste_text(
text,
timeout=self.server.settings.timeout,
overall_timeout=self.server.settings.overall_timeout,
verify=self.server.settings.verify,
)
except ClientError as exc:
self._log_api(False, int(exc.status), error=exc.message, duration=time.monotonic() - started)
self._send_json(exc.status, {"error": exc.message})
return
except json.JSONDecodeError:
self._log_api(False, 400, error="invalid json", duration=time.monotonic() - started)
self._send_json(HTTPStatus.BAD_REQUEST, {"error": "invalid json"})
return
except PasteError as exc:
self._log_api(False, 502, error=str(exc), duration=time.monotonic() - started)
self._send_json(HTTPStatus.BAD_GATEWAY, {"error": "all providers failed"})
return
except Exception as exc: # noqa: BLE001 - keep HTTP errors concise.
self._log_api(False, 500, error=str(exc), duration=time.monotonic() - started)
self._send_json(HTTPStatus.INTERNAL_SERVER_ERROR, {"error": "server error"})
return
body = {"url": result.url, "provider": result.provider}
self._log_api(True, 200, provider=result.provider, url=result.url, duration=time.monotonic() - started)
self._send_json(HTTPStatus.OK, body)
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - inherited name.
print(f"{self.address_string()} - {format % args}", file=sys.stderr)
def _read_json_body(self) -> Any:
content_length = self.headers.get("Content-Length")
if content_length is None:
raise ClientError(HTTPStatus.BAD_REQUEST, "missing content length")
try:
length = int(content_length)
except ValueError as exc:
raise ClientError(HTTPStatus.BAD_REQUEST, "invalid content length") from exc
if length > MAX_REQUEST_BYTES:
raise ClientError(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "request too large")
body = self.rfile.read(length)
try:
return json.loads(body.decode("utf-8"))
except UnicodeDecodeError as exc:
raise json.JSONDecodeError("invalid utf-8", "", 0) from exc
def _send_html(self, content: str) -> None:
body = content.encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0")
self.send_header("Pragma", "no-cache")
self.end_headers()
self.wfile.write(body)
def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
body = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(body)
def _log_api(
self,
ok: bool,
status: int,
*,
duration: float,
error: str | None = None,
provider: str | None = None,
url: str | None = None,
) -> None:
entry: dict[str, Any] = {
"event": "api.paste",
"ok": ok,
"status": status,
"duration_ms": round(duration * 1000, 2),
"client": self.client_address[0],
}
if error:
entry["error"] = error
if provider:
entry["provider"] = provider
if url:
entry["url"] = url
self.server.append_api_log(entry)
class ClientError(Exception):
def __init__(self, status: HTTPStatus, message: str) -> None:
super().__init__(message)
self.status = status
self.message = message
INDEX_HTML = f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>publicpaste</title>
<style>
:root {{ color-scheme: light dark; font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; }}
body {{ margin: 0; min-height: 100vh; display: grid; place-items: center; background: #f4f6f8; color: #17202a; }}
main {{ width: min(880px, calc(100vw - 32px)); background: white; border-radius: 18px; box-shadow: 0 18px 45px rgba(0,0,0,.10); padding: 28px; }}
h1 {{ margin: 0 0 8px; font-size: 28px; }}
p {{ color: #59636e; }}
textarea {{ box-sizing: border-box; width: 100%; min-height: 280px; resize: vertical; border: 1px solid #ccd3da; border-radius: 12px; padding: 14px; font: 15px/1.5 ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; }}
button {{ border: 0; border-radius: 999px; padding: 11px 18px; background: #1769e0; color: white; font-weight: 700; cursor: pointer; }}
button:disabled {{ opacity: .65; cursor: wait; }}
.row {{ display: flex; gap: 12px; align-items: center; flex-wrap: wrap; margin-top: 14px; }}
.status {{ min-height: 24px; color: #59636e; }}
.result {{ display: none; margin-top: 18px; padding: 14px; border: 1px solid #d8dee6; border-radius: 12px; background: #f9fafb; word-break: break-all; }}
.urlbox {{ box-sizing: border-box; width: 100%; margin-top: 8px; border: 1px solid #ccd3da; border-radius: 10px; padding: 10px 12px; font: 15px/1.4 ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; }}
.error {{ color: #b42318; }}
@media (prefers-color-scheme: dark) {{ body {{ background: #111827; color: #f9fafb; }} main {{ background: #1f2937; }} p,.status {{ color: #cbd5e1; }} textarea,.result,.urlbox {{ background: #111827; color: #f9fafb; border-color: #374151; }} }}
</style>
</head>
<body>
<main>
<h1>publicpaste</h1>
<p>输入要发布的文本,点击“完成”后会返回公开 paste 链接并自动复制到剪贴板。</p>
<textarea id="text" maxlength="{MAX_REQUEST_BYTES}" placeholder="在这里输入文本..."></textarea>
<div class="row">
<button id="submit" type="button">完成</button>
<button id="copy" type="button" style="display:none">再次复制/选中链接</button>
<span id="status" class="status"></span>
</div>
<section id="result" class="result" aria-live="polite">
<div>Provider: <strong id="provider"></strong></div>
<div>URL: <a id="url" href="#" target="_blank" rel="noopener noreferrer"></a></div>
<input id="urlInput" class="urlbox" readonly value="" aria-label="Paste URL">
</section>
</main>
<script>
const text = document.getElementById('text');
const submit = document.getElementById('submit');
const copy = document.getElementById('copy');
const statusEl = document.getElementById('status');
const resultEl = document.getElementById('result');
const providerEl = document.getElementById('provider');
const urlEl = document.getElementById('url');
const urlInput = document.getElementById('urlInput');
let currentUrl = '';
function setStatus(message, isError = false) {{
statusEl.textContent = message;
statusEl.className = isError ? 'status error' : 'status';
}}
function selectUrl() {{
urlInput.focus();
urlInput.select();
urlInput.setSelectionRange(0, urlInput.value.length);
}}
function legacyCopy() {{
selectUrl();
try {{
return document.execCommand('copy');
}} catch (err) {{
return false;
}}
}}
async function copyUrl({{ automatic = false }} = {{}}) {{
if (!currentUrl) return false;
// execCommand works in many HTTP/IP scenarios, but it usually must run
// directly inside a click event. Try it first.
if (legacyCopy()) {{
setStatus('链接已复制到剪贴板');
return true;
}}
// Async Clipboard only works in secure contexts in most browsers.
if (navigator.clipboard && window.isSecureContext) {{
try {{
await navigator.clipboard.writeText(currentUrl);
setStatus('链接已复制到剪贴板');
return true;
}} catch (err) {{
// Keep the selected input visible for manual copy below.
}}
}}
selectUrl();
setStatus(
automatic
? '浏览器限制了自动复制,链接已选中,请按 Ctrl+C / ⌘+C 复制'
: '浏览器限制了复制,链接已选中,请按 Ctrl+C / ⌘+C 复制',
true,
);
return false;
}}
function requestPasteSynchronously(value) {{
const xhr = new XMLHttpRequest();
xhr.open('POST', '/api/paste', false);
xhr.setRequestHeader('Content-Type', 'application/json');
xhr.send(JSON.stringify({{ text: value }}));
let data = {{}};
try {{
data = JSON.parse(xhr.responseText || '{{}}');
}} catch (err) {{
data = {{ error: 'invalid response' }};
}}
if (xhr.status < 200 || xhr.status >= 300) {{
throw new Error(data.error || 'request failed');
}}
return data;
}}
submit.addEventListener('click', () => {{
const value = text.value;
if (!value.trim()) {{
setStatus('请输入文本', true);
return;
}}
submit.disabled = true;
copy.style.display = 'none';
resultEl.style.display = 'none';
setStatus('正在提交...');
try {{
const data = requestPasteSynchronously(value);
currentUrl = data.url;
providerEl.textContent = data.provider || '';
urlEl.textContent = currentUrl;
urlEl.href = currentUrl;
urlInput.value = currentUrl;
resultEl.style.display = 'block';
copy.style.display = 'inline-block';
setStatus('提交成功,正在复制链接...');
void copyUrl({{ automatic: true }});
}} catch (err) {{
setStatus(err instanceof Error ? err.message : '提交失败', true);
}} finally {{
submit.disabled = false;
}}
}});
copy.addEventListener('click', () => {{ void copyUrl(); }});
</script>
</body>
</html>
"""
def main() -> int:
try:
settings = load_settings()
settings.data_dir.mkdir(parents=True, exist_ok=True)
server = PublicPasteServer((settings.host, settings.port), settings)
except Exception as exc: # noqa: BLE001
print(f"publicpaste-web: {exc}", file=sys.stderr)
return 1
print(f"publicpaste-web: listening on {settings.host}:{settings.port}", file=sys.stderr)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())