Add public paste CLI and skill
Provide a zero-dependency Python tool that races multiple public paste providers and documents safe AI-tool usage through a project skill.
This commit is contained in:
@@ -1,2 +1,10 @@
|
||||
.narrafork/
|
||||
.worktrees/
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
build/
|
||||
dist/
|
||||
*.egg-info/
|
||||
.venv/
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
# publicpaste
|
||||
|
||||
`publicpaste` is a tiny Python package and CLI that posts text to multiple public paste providers concurrently and returns the first successful direct URL.
|
||||
|
||||
- Python 3 standard library first
|
||||
- zero runtime dependencies
|
||||
- CLI and Python API
|
||||
- concurrent provider fan-out with first-success return
|
||||
- optional verification of the returned URL
|
||||
|
||||
> Only paste text that is safe to publish. Returned links are public, and third-party services have their own retention and acceptable-use policies.
|
||||
|
||||
## Install
|
||||
|
||||
From this repository:
|
||||
|
||||
```sh
|
||||
python -m pip install .
|
||||
```
|
||||
|
||||
## CLI
|
||||
|
||||
```sh
|
||||
publicpaste "hello from publicpaste"
|
||||
echo "hello from stdin" | publicpaste
|
||||
publicpaste --file a.txt
|
||||
```
|
||||
|
||||
By default stdout contains only the URL, making the command easy to pipe:
|
||||
|
||||
```sh
|
||||
url=$(publicpaste --file notes.txt)
|
||||
```
|
||||
|
||||
JSON output includes the provider name:
|
||||
|
||||
```sh
|
||||
publicpaste --json "hello"
|
||||
# {"url":"https://...","provider":"paste.rs"}
|
||||
```
|
||||
|
||||
Useful options:
|
||||
|
||||
- `--timeout SECONDS`: per-provider timeout, default `8.0`
|
||||
- `--overall-timeout SECONDS`: total wait budget, default `12.0`
|
||||
- `--json`: print `url` and `provider` as JSON
|
||||
- `--verbose`: print provider diagnostics on failure
|
||||
- `--verify`: GET the returned URL and confirm a 2xx response containing a text fragment
|
||||
- `--list-providers`: list provider names
|
||||
- `--provider NAME`: use only a provider; repeat to select several
|
||||
- `--disable-provider NAME`: disable a provider; repeatable
|
||||
|
||||
Failures print a concise error to stderr and exit with status `1`.
|
||||
|
||||
## Python API
|
||||
|
||||
```python
|
||||
from publicpaste import paste_text
|
||||
|
||||
result = paste_text("hello", timeout=8.0, overall_timeout=12.0)
|
||||
print(result.url)
|
||||
print(result.provider)
|
||||
```
|
||||
|
||||
Signature:
|
||||
|
||||
```python
|
||||
paste_text(
|
||||
text,
|
||||
timeout=8.0,
|
||||
overall_timeout=12.0,
|
||||
providers=None,
|
||||
disabled_providers=None,
|
||||
verify=False,
|
||||
)
|
||||
```
|
||||
|
||||
`providers` may be a sequence of provider names or provider instances. `disabled_providers` removes providers by name. The function returns a `PasteResult` dataclass with at least `url` and `provider`. If every provider fails, it raises `PasteError` with aggregated per-provider errors.
|
||||
|
||||
## Providers
|
||||
|
||||
Default enabled providers:
|
||||
|
||||
- `paste.rs` — `POST https://paste.rs/`, raw UTF-8 body; `GET /<id>` returns plain text.
|
||||
- `dpaste.org` — `POST https://dpaste.org/api/`, form fields `content`, `lexer=plain`, `format=url`, `expires_on=2592000`.
|
||||
- `0x0.st` — multipart field `file`, filename `paste.txt`, `text/plain`; this is a public file host, so use only non-sensitive text.
|
||||
- `snippet.host` — form fields `content`, `visibility=2`, `expires=1month`, `language=plain text`; parses `Location` or HTML URL.
|
||||
- `paste.centos.org` — `POST https://paste.centos.org/`, form fields `code`, `lang=text`, `expire=1440`; parses the final redirected view URL. Its documented `/api/create` endpoint currently requires an API key, so the browser-compatible form endpoint is used.
|
||||
- `termbin.com` — TCP port `9999`; low priority but enabled by default.
|
||||
|
||||
Each provider has a response parsing method so parsing can be unit tested without network access.
|
||||
|
||||
## Validation and verification
|
||||
|
||||
Provider responses are accepted only when the URL uses `http` or `https` and matches the expected provider host and path shape. By default `publicpaste` does not fetch the returned URL after creation, keeping the fastest provider path quick.
|
||||
|
||||
When `verify=True` or `--verify` is used, `publicpaste` performs a GET against the returned direct URL and checks that the response is 2xx and contains a fragment of the original text. HTML escaping is tolerated.
|
||||
|
||||
## Why not PrivateBin, privydrop, clipboard-style services, or similar?
|
||||
|
||||
This project intentionally targets simple public paste endpoints with stable, scriptable APIs. It does not include PrivateBin, privydrop, clipboard-style services, or similar tools when they require browser JavaScript, P2P flows, access codes, encrypted URL fragments, unstable/undocumented APIs, or interaction patterns that do not reliably return a direct public URL from a simple request.
|
||||
|
||||
The initially reviewed services that are not enabled by default for those reasons include `clipboard.im`, `clipboard.run`, `clipboardify.com`, `clipboardonline.com`, `privydrop.app`, `privatebin.net`, `paste.drhack.net`, `pasteme.cn`, `paste.sdjz.wiki`, and `myonlineclipboard.com`.
|
||||
|
||||
## Tests
|
||||
|
||||
Tests are unit-only and do not access the public internet:
|
||||
|
||||
```sh
|
||||
python -m pytest
|
||||
```
|
||||
@@ -0,0 +1,7 @@
|
||||
"""publicpaste: create a public paste via the first working provider."""
|
||||
|
||||
from .core import PasteError, PasteResult, paste_text
|
||||
from .providers import ProviderError, provider_names
|
||||
|
||||
__all__ = ["PasteError", "PasteResult", "ProviderError", "paste_text", "provider_names"]
|
||||
__version__ = "0.1.0"
|
||||
@@ -0,0 +1,5 @@
|
||||
"""Run publicpaste as ``python -m publicpaste``."""
|
||||
|
||||
from .cli import main
|
||||
|
||||
raise SystemExit(main())
|
||||
@@ -0,0 +1,75 @@
|
||||
"""Command-line interface for publicpaste."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Sequence, TextIO
|
||||
|
||||
from .core import PasteError, paste_text
|
||||
from .providers import ProviderError, provider_names
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Paste text to the first working public paste provider.")
|
||||
parser.add_argument("text", nargs="?", help="text to paste; stdin is used when omitted")
|
||||
parser.add_argument("--file", "-f", type=Path, help="read text from a file")
|
||||
parser.add_argument("--timeout", type=float, default=8.0, help="per-provider timeout in seconds")
|
||||
parser.add_argument("--overall-timeout", type=float, default=12.0, help="overall timeout in seconds")
|
||||
parser.add_argument("--json", action="store_true", help="print JSON instead of a bare URL")
|
||||
parser.add_argument("--verbose", "-v", action="store_true", help="print provider diagnostics on failure")
|
||||
parser.add_argument("--verify", action="store_true", help="GET the returned URL and check it contains the text")
|
||||
parser.add_argument("--list-providers", action="store_true", help="list available provider names and exit")
|
||||
parser.add_argument("--provider", action="append", dest="providers", help="enable only this provider; repeatable")
|
||||
parser.add_argument("--disable-provider", action="append", default=[], help="disable a provider; repeatable")
|
||||
return parser
|
||||
|
||||
|
||||
def read_input(args: argparse.Namespace, stdin: TextIO) -> str:
|
||||
sources = int(args.text is not None) + int(args.file is not None)
|
||||
if sources > 1:
|
||||
raise ValueError("provide text argument or --file, not both")
|
||||
if args.file is not None:
|
||||
return args.file.read_text(encoding="utf-8")
|
||||
if args.text is not None:
|
||||
return args.text
|
||||
return stdin.read()
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = build_parser()
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.list_providers:
|
||||
for name in provider_names():
|
||||
print(name)
|
||||
return 0
|
||||
|
||||
try:
|
||||
text = read_input(args, sys.stdin)
|
||||
result = paste_text(
|
||||
text,
|
||||
timeout=args.timeout,
|
||||
overall_timeout=args.overall_timeout,
|
||||
providers=args.providers,
|
||||
disabled_providers=args.disable_provider,
|
||||
verify=args.verify,
|
||||
)
|
||||
except (OSError, ValueError, ProviderError, PasteError) as exc:
|
||||
print(f"publicpaste: {exc}", file=sys.stderr)
|
||||
if args.verbose and isinstance(exc, PasteError) and exc.errors:
|
||||
for provider, error in sorted(exc.errors.items()):
|
||||
print(f" {provider}: {error}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
if args.json:
|
||||
print(json.dumps({"url": result.url, "provider": result.provider}, ensure_ascii=False))
|
||||
else:
|
||||
print(result.url)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
raise SystemExit(main())
|
||||
@@ -0,0 +1,136 @@
|
||||
"""Core publicpaste API."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Sequence
|
||||
|
||||
from .http import http_request
|
||||
from .providers import PasteProvider, ProviderError, ProviderResult, select_providers
|
||||
from .validation import body_contains_text_fragment
|
||||
|
||||
ProviderSelection = Sequence[str | PasteProvider]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class PasteResult:
|
||||
"""Successful paste result."""
|
||||
|
||||
url: str
|
||||
provider: str
|
||||
|
||||
|
||||
class PasteError(RuntimeError):
|
||||
"""Raised when all providers fail."""
|
||||
|
||||
def __init__(self, message: str, errors: dict[str, str] | None = None) -> None:
|
||||
super().__init__(message)
|
||||
self.errors = errors or {}
|
||||
|
||||
|
||||
def _coerce_providers(
|
||||
providers: ProviderSelection | None,
|
||||
disabled_providers: Sequence[str] | None,
|
||||
) -> tuple[PasteProvider, ...]:
|
||||
if providers is None or all(isinstance(provider, str) for provider in providers):
|
||||
return select_providers(
|
||||
providers=tuple(provider for provider in providers or () if isinstance(provider, str)) or None,
|
||||
disabled_providers=tuple(disabled_providers or ()),
|
||||
)
|
||||
if disabled_providers:
|
||||
disabled = set(disabled_providers)
|
||||
selected = tuple(
|
||||
provider for provider in providers if isinstance(provider, PasteProvider) and provider.name not in disabled
|
||||
)
|
||||
else:
|
||||
selected = tuple(provider for provider in providers if isinstance(provider, PasteProvider))
|
||||
if not selected:
|
||||
raise PasteError("no providers enabled")
|
||||
return selected
|
||||
|
||||
|
||||
def _verify_result(result: PasteResult, text: str, *, timeout: float) -> None:
|
||||
response = http_request(result.url, timeout=timeout)
|
||||
if response.status < 200 or response.status >= 300:
|
||||
raise ProviderError(f"verification returned HTTP {response.status}")
|
||||
if not body_contains_text_fragment(response.body, text):
|
||||
raise ProviderError("verification did not find pasted text")
|
||||
|
||||
|
||||
def paste_text(
|
||||
text: str,
|
||||
timeout: float = 8.0,
|
||||
overall_timeout: float = 12.0,
|
||||
providers: ProviderSelection | None = None,
|
||||
disabled_providers: Sequence[str] | None = None,
|
||||
verify: bool = False,
|
||||
) -> PasteResult:
|
||||
"""Create a public paste and return the first successful direct URL.
|
||||
|
||||
Providers are attempted concurrently. The first provider that returns a valid
|
||||
URL wins; remaining in-flight requests are ignored.
|
||||
"""
|
||||
|
||||
if not isinstance(text, str):
|
||||
raise TypeError("text must be str")
|
||||
if timeout <= 0:
|
||||
raise ValueError("timeout must be positive")
|
||||
if overall_timeout <= 0:
|
||||
raise ValueError("overall_timeout must be positive")
|
||||
|
||||
selected = _coerce_providers(providers, disabled_providers)
|
||||
errors: dict[str, str] = {}
|
||||
start = time.monotonic()
|
||||
|
||||
results: queue.Queue[tuple[PasteProvider, ProviderResult | None, BaseException | None]] = queue.Queue()
|
||||
stop_event = threading.Event()
|
||||
|
||||
def worker(provider: PasteProvider) -> None:
|
||||
try:
|
||||
provider_result = provider.paste(text, timeout=timeout)
|
||||
if not stop_event.is_set():
|
||||
results.put((provider, provider_result, None))
|
||||
except BaseException as exc: # noqa: BLE001 - aggregate provider failures for the caller.
|
||||
if not stop_event.is_set():
|
||||
results.put((provider, None, exc))
|
||||
|
||||
threads = [threading.Thread(target=worker, args=(provider,), daemon=True) for provider in selected]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
remaining_providers = len(threads)
|
||||
while remaining_providers:
|
||||
remaining = overall_timeout - (time.monotonic() - start)
|
||||
if remaining <= 0:
|
||||
break
|
||||
try:
|
||||
provider, value, exc = results.get(timeout=remaining)
|
||||
except queue.Empty:
|
||||
break
|
||||
remaining_providers -= 1
|
||||
if exc is not None:
|
||||
errors[provider.name] = str(exc)
|
||||
continue
|
||||
try:
|
||||
if value is None:
|
||||
raise ProviderError("provider returned no result")
|
||||
result = PasteResult(url=value.url, provider=value.provider)
|
||||
if verify:
|
||||
verify_remaining = overall_timeout - (time.monotonic() - start)
|
||||
_verify_result(result, text, timeout=min(timeout, max(0.1, verify_remaining)))
|
||||
stop_event.set()
|
||||
return result
|
||||
except Exception as verify_exc: # noqa: BLE001
|
||||
errors[provider.name] = str(verify_exc)
|
||||
|
||||
stop_event.set()
|
||||
for provider in selected:
|
||||
errors.setdefault(provider.name, "timed out")
|
||||
details = "; ".join(f"{name}: {error}" for name, error in sorted(errors.items()))
|
||||
raise PasteError(f"all providers failed{': ' + details if details else ''}", errors)
|
||||
|
||||
|
||||
__all__ = ["PasteError", "PasteResult", "paste_text"]
|
||||
@@ -0,0 +1,104 @@
|
||||
"""Small standard-library HTTP helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import socket
|
||||
import ssl
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from typing import Mapping
|
||||
from urllib import parse, request
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class HttpResponse:
|
||||
status: int
|
||||
headers: Mapping[str, str]
|
||||
body: bytes
|
||||
url: str
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
return self.body.decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def http_request(
|
||||
url: str,
|
||||
*,
|
||||
method: str = "GET",
|
||||
data: bytes | None = None,
|
||||
headers: Mapping[str, str] | None = None,
|
||||
timeout: float = 8.0,
|
||||
) -> HttpResponse:
|
||||
"""Perform a simple HTTP request."""
|
||||
|
||||
request_headers = {"User-Agent": "publicpaste/0.1 (+https://git.mcraft.cn/sinvo/publicpaste)"}
|
||||
request_headers.update(dict(headers or {}))
|
||||
req = request.Request(url, data=data, headers=request_headers, method=method)
|
||||
with request.urlopen(req, timeout=timeout) as resp: # noqa: S310 - public paste targets are user requested.
|
||||
body = resp.read()
|
||||
response_headers = {k.lower(): v for k, v in resp.headers.items()}
|
||||
response_headers.setdefault("x-final-url", resp.geturl())
|
||||
return HttpResponse(
|
||||
status=int(resp.status),
|
||||
headers=response_headers,
|
||||
body=body,
|
||||
url=resp.geturl(),
|
||||
)
|
||||
|
||||
|
||||
def form_encode(fields: Mapping[str, str]) -> tuple[bytes, dict[str, str]]:
|
||||
data = parse.urlencode(fields).encode("utf-8")
|
||||
return data, {"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"}
|
||||
|
||||
|
||||
def multipart_form_data(
|
||||
*,
|
||||
field_name: str,
|
||||
filename: str,
|
||||
content: bytes,
|
||||
content_type: str = "application/octet-stream",
|
||||
) -> tuple[bytes, dict[str, str]]:
|
||||
boundary = f"----publicpaste-{uuid.uuid4().hex}"
|
||||
parts = [
|
||||
f"--{boundary}\r\n".encode(),
|
||||
(
|
||||
f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'
|
||||
f"Content-Type: {content_type}\r\n\r\n"
|
||||
).encode("utf-8"),
|
||||
content,
|
||||
b"\r\n",
|
||||
f"--{boundary}--\r\n".encode(),
|
||||
]
|
||||
return b"".join(parts), {"Content-Type": f"multipart/form-data; boundary={boundary}"}
|
||||
|
||||
|
||||
def tcp_send(host: str, port: int, payload: bytes, *, timeout: float = 8.0) -> str:
|
||||
"""Send bytes to a TCP service and return its UTF-8 response."""
|
||||
|
||||
with socket.create_connection((host, port), timeout=timeout) as sock:
|
||||
sock.settimeout(timeout)
|
||||
sock.sendall(payload)
|
||||
try:
|
||||
sock.shutdown(socket.SHUT_WR)
|
||||
except OSError:
|
||||
pass
|
||||
chunks: list[bytes] = []
|
||||
while True:
|
||||
chunk = sock.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
chunks.append(chunk)
|
||||
return b"".join(chunks).decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def json_loads_maybe(text: str) -> object | None:
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
|
||||
def default_ssl_context() -> ssl.SSLContext:
|
||||
return ssl.create_default_context()
|
||||
@@ -0,0 +1,273 @@
|
||||
"""Paste provider implementations."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, ClassVar
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from .http import form_encode, http_request, json_loads_maybe, multipart_form_data, tcp_send
|
||||
from .validation import ValidationError, validate_url
|
||||
|
||||
|
||||
class ProviderError(RuntimeError):
|
||||
"""Raised when a provider cannot create a paste."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProviderResult:
|
||||
provider: str
|
||||
url: str
|
||||
|
||||
|
||||
class PasteProvider:
|
||||
"""Base class for paste providers."""
|
||||
|
||||
name: ClassVar[str]
|
||||
priority: ClassVar[int] = 100
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
raise NotImplementedError
|
||||
|
||||
def _result(self, url: str) -> ProviderResult:
|
||||
try:
|
||||
validated_url = validate_url(self.name, url.strip())
|
||||
except ValidationError as exc:
|
||||
raise ProviderError(str(exc)) from exc
|
||||
return ProviderResult(provider=self.name, url=validated_url)
|
||||
|
||||
|
||||
_URL_RE = re.compile(r"https?://[^\s'\"<>]+")
|
||||
|
||||
|
||||
def first_url(text: str) -> str | None:
|
||||
match = _URL_RE.search(text)
|
||||
return match.group(0).rstrip(".,;)]") if match else None
|
||||
|
||||
|
||||
class PasteRsProvider(PasteProvider):
|
||||
name = "paste.rs"
|
||||
priority = 10
|
||||
endpoint = "https://paste.rs/"
|
||||
|
||||
@staticmethod
|
||||
def parse_response(body: bytes | str, headers: dict[str, str] | None = None) -> str:
|
||||
text = body.decode("utf-8", errors="replace") if isinstance(body, bytes) else body
|
||||
url = first_url(text.strip())
|
||||
if not url:
|
||||
raise ProviderError("paste.rs did not return a URL")
|
||||
return url
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
resp = http_request(
|
||||
self.endpoint,
|
||||
method="POST",
|
||||
data=text.encode("utf-8"),
|
||||
headers={"Content-Type": "text/plain; charset=utf-8"},
|
||||
timeout=timeout,
|
||||
)
|
||||
if resp.status not in (201, 206):
|
||||
raise ProviderError(f"paste.rs returned HTTP {resp.status}")
|
||||
return self._result(self.parse_response(resp.body, dict(resp.headers)))
|
||||
|
||||
|
||||
class DpasteProvider(PasteProvider):
|
||||
name = "dpaste.org"
|
||||
priority = 20
|
||||
endpoint = "https://dpaste.org/api/"
|
||||
|
||||
@staticmethod
|
||||
def parse_response(body: bytes | str, headers: dict[str, str] | None = None) -> str:
|
||||
text = body.decode("utf-8", errors="replace") if isinstance(body, bytes) else body
|
||||
url = first_url(text.strip())
|
||||
if not url:
|
||||
raise ProviderError("dpaste.org did not return a URL")
|
||||
return url
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
data, headers = form_encode(
|
||||
{
|
||||
"content": text,
|
||||
"lexer": "plain",
|
||||
"format": "url",
|
||||
"expires_on": "2592000",
|
||||
}
|
||||
)
|
||||
resp = http_request(self.endpoint, method="POST", data=data, headers=headers, timeout=timeout)
|
||||
if resp.status < 200 or resp.status >= 300:
|
||||
raise ProviderError(f"dpaste.org returned HTTP {resp.status}")
|
||||
return self._result(self.parse_response(resp.body, dict(resp.headers)))
|
||||
|
||||
|
||||
class ZeroXZeroProvider(PasteProvider):
|
||||
name = "0x0.st"
|
||||
priority = 30
|
||||
endpoint = "https://0x0.st"
|
||||
|
||||
@staticmethod
|
||||
def parse_response(body: bytes | str, headers: dict[str, str] | None = None) -> str:
|
||||
text = body.decode("utf-8", errors="replace") if isinstance(body, bytes) else body
|
||||
url = first_url(text.strip())
|
||||
if not url:
|
||||
raise ProviderError("0x0.st did not return a URL")
|
||||
return url
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
data, headers = multipart_form_data(
|
||||
field_name="file",
|
||||
filename="paste.txt",
|
||||
content=text.encode("utf-8"),
|
||||
content_type="text/plain; charset=utf-8",
|
||||
)
|
||||
resp = http_request(self.endpoint, method="POST", data=data, headers=headers, timeout=timeout)
|
||||
if resp.status < 200 or resp.status >= 300:
|
||||
raise ProviderError(f"0x0.st returned HTTP {resp.status}")
|
||||
return self._result(self.parse_response(resp.body, dict(resp.headers)))
|
||||
|
||||
|
||||
class SnippetHostProvider(PasteProvider):
|
||||
name = "snippet.host"
|
||||
priority = 40
|
||||
endpoint = "https://snippet.host/"
|
||||
|
||||
@staticmethod
|
||||
def parse_response(body: bytes | str, headers: dict[str, str] | None = None) -> str:
|
||||
headers = {k.lower(): v for k, v in (headers or {}).items()}
|
||||
location = headers.get("location")
|
||||
if location:
|
||||
return urljoin(SnippetHostProvider.endpoint, location)
|
||||
text = body.decode("utf-8", errors="replace") if isinstance(body, bytes) else body
|
||||
final_url = headers.get("x-final-url")
|
||||
if final_url and final_url.rstrip("/") != SnippetHostProvider.endpoint.rstrip("/"):
|
||||
return final_url
|
||||
for url in _URL_RE.findall(text):
|
||||
if "snippet.host" in url:
|
||||
return url.rstrip(".,;)]")
|
||||
href_match = re.search(r'href=["\']([^"\']+)["\']', text, re.IGNORECASE)
|
||||
if href_match:
|
||||
return urljoin(SnippetHostProvider.endpoint, href_match.group(1))
|
||||
raise ProviderError("snippet.host did not return a snippet URL")
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
data, headers = form_encode(
|
||||
{
|
||||
"content": text,
|
||||
"visibility": "2",
|
||||
"expires": "1month",
|
||||
"language": "plain text",
|
||||
}
|
||||
)
|
||||
resp = http_request(self.endpoint, method="POST", data=data, headers=headers, timeout=timeout)
|
||||
if resp.status < 200 or resp.status >= 400:
|
||||
raise ProviderError(f"snippet.host returned HTTP {resp.status}")
|
||||
return self._result(self.parse_response(resp.body, dict(resp.headers)))
|
||||
|
||||
|
||||
class PasteCentosProvider(PasteProvider):
|
||||
name = "paste.centos.org"
|
||||
priority = 50
|
||||
endpoint = "https://paste.centos.org/"
|
||||
|
||||
@staticmethod
|
||||
def _url_from_json(value: object) -> str | None:
|
||||
if isinstance(value, str):
|
||||
return value if value.startswith(("http://", "https://")) else None
|
||||
if isinstance(value, dict):
|
||||
for key in ("url", "view_url", "link", "href"):
|
||||
candidate = value.get(key)
|
||||
if isinstance(candidate, str) and candidate.startswith(("http://", "https://")):
|
||||
return candidate
|
||||
for key in ("id", "paste_id"):
|
||||
paste_id = value.get(key)
|
||||
if isinstance(paste_id, (str, int)):
|
||||
return f"https://paste.centos.org/view/{paste_id}"
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def parse_response(body: bytes | str, headers: dict[str, str] | None = None) -> str:
|
||||
headers = {k.lower(): v for k, v in (headers or {}).items()}
|
||||
final_url = headers.get("x-final-url")
|
||||
if final_url and final_url.rstrip("/") != "https://paste.centos.org":
|
||||
return final_url
|
||||
text = body.decode("utf-8", errors="replace") if isinstance(body, bytes) else body
|
||||
parsed = json_loads_maybe(text)
|
||||
url = PasteCentosProvider._url_from_json(parsed) if parsed is not None else None
|
||||
if not url:
|
||||
url = first_url(text)
|
||||
if not url:
|
||||
raise ProviderError("paste.centos.org did not return a view URL")
|
||||
return url
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
data, headers = form_encode(
|
||||
{
|
||||
"code": text,
|
||||
"lang": "text",
|
||||
"expire": "1440",
|
||||
"submit": "submit",
|
||||
"name": "publicpaste",
|
||||
"title": "publicpaste",
|
||||
}
|
||||
)
|
||||
resp = http_request(self.endpoint, method="POST", data=data, headers=headers, timeout=timeout)
|
||||
if resp.status < 200 or resp.status >= 300:
|
||||
raise ProviderError(f"paste.centos.org returned HTTP {resp.status}")
|
||||
return self._result(self.parse_response(resp.body, dict(resp.headers)))
|
||||
|
||||
|
||||
class TermbinProvider(PasteProvider):
|
||||
name = "termbin.com"
|
||||
priority = 1000
|
||||
|
||||
@staticmethod
|
||||
def parse_response(body: bytes | str, headers: dict[str, str] | None = None) -> str:
|
||||
text = body.decode("utf-8", errors="replace") if isinstance(body, bytes) else body
|
||||
url = first_url(text.strip())
|
||||
if not url:
|
||||
raise ProviderError("termbin.com did not return a URL")
|
||||
return url
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
response = tcp_send("termbin.com", 9999, text.encode("utf-8") + b"\n", timeout=timeout)
|
||||
return self._result(self.parse_response(response))
|
||||
|
||||
|
||||
PROVIDER_CLASSES: tuple[type[PasteProvider], ...] = (
|
||||
PasteRsProvider,
|
||||
DpasteProvider,
|
||||
ZeroXZeroProvider,
|
||||
SnippetHostProvider,
|
||||
PasteCentosProvider,
|
||||
TermbinProvider,
|
||||
)
|
||||
|
||||
|
||||
def default_providers() -> tuple[PasteProvider, ...]:
|
||||
return tuple(cls() for cls in sorted(PROVIDER_CLASSES, key=lambda item: item.priority))
|
||||
|
||||
|
||||
def provider_names() -> tuple[str, ...]:
|
||||
return tuple(provider.name for provider in default_providers())
|
||||
|
||||
|
||||
def select_providers(
|
||||
providers: list[str] | tuple[str, ...] | None = None,
|
||||
disabled_providers: list[str] | tuple[str, ...] | None = None,
|
||||
) -> tuple[PasteProvider, ...]:
|
||||
disabled = set(disabled_providers or ())
|
||||
selected = list(providers) if providers else list(provider_names())
|
||||
known = {provider.name: provider for provider in default_providers()}
|
||||
result: list[PasteProvider] = []
|
||||
unknown = [name for name in selected + list(disabled) if name not in known]
|
||||
if unknown:
|
||||
raise ProviderError(f"unknown provider(s): {', '.join(sorted(set(unknown)))}")
|
||||
for name in selected:
|
||||
if name not in disabled and name not in {provider.name for provider in result}:
|
||||
result.append(known[name])
|
||||
if not result:
|
||||
raise ProviderError("no providers enabled")
|
||||
return tuple(result)
|
||||
|
||||
|
||||
ProviderFactory = Callable[[], PasteProvider]
|
||||
@@ -0,0 +1,90 @@
|
||||
"""Validation and verification helpers for publicpaste."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import html
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
class ValidationError(ValueError):
|
||||
"""Raised when a provider returns an invalid URL."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ProviderUrlRule:
|
||||
"""URL validation rule for a paste provider."""
|
||||
|
||||
provider: str
|
||||
hosts: tuple[str, ...]
|
||||
path_pattern: str | None = None
|
||||
schemes: tuple[str, ...] = ("http", "https")
|
||||
|
||||
def validate(self, url: str) -> str:
|
||||
parsed = urlparse(url.strip())
|
||||
if parsed.scheme not in self.schemes:
|
||||
raise ValidationError(f"{self.provider}: URL must use http or https")
|
||||
host = (parsed.hostname or "").lower()
|
||||
if host not in self.hosts:
|
||||
raise ValidationError(f"{self.provider}: unexpected URL host {host or '<missing>'}")
|
||||
if self.path_pattern is not None and not re.fullmatch(self.path_pattern, parsed.path or ""):
|
||||
raise ValidationError(f"{self.provider}: unexpected URL path {parsed.path or '<missing>'}")
|
||||
return parsed.geturl()
|
||||
|
||||
|
||||
RULES: dict[str, ProviderUrlRule] = {
|
||||
"paste.rs": ProviderUrlRule("paste.rs", ("paste.rs",), r"/[A-Za-z0-9._~!$&'()*+,;=:@%-]+"),
|
||||
"dpaste.org": ProviderUrlRule("dpaste.org", ("dpaste.org", "www.dpaste.org"), r"/[A-Za-z0-9]+/?"),
|
||||
"0x0.st": ProviderUrlRule("0x0.st", ("0x0.st",), r"/[A-Za-z0-9._~-]+"),
|
||||
"snippet.host": ProviderUrlRule("snippet.host", ("snippet.host", "www.snippet.host"), r"/[A-Za-z0-9._~/-]+"),
|
||||
"paste.centos.org": ProviderUrlRule("paste.centos.org", ("paste.centos.org",), r"/(?:view/)?[A-Za-z0-9._~-]+/?"),
|
||||
"termbin.com": ProviderUrlRule("termbin.com", ("termbin.com",), r"/[A-Za-z0-9._~-]+"),
|
||||
}
|
||||
|
||||
|
||||
def validate_url(provider: str, url: str) -> str:
|
||||
"""Validate and normalize a provider URL."""
|
||||
|
||||
rule = RULES.get(provider)
|
||||
if rule is None:
|
||||
raise ValidationError(f"unknown provider {provider}")
|
||||
return rule.validate(url)
|
||||
|
||||
|
||||
def text_probe_fragments(text: str, *, max_len: int = 80) -> tuple[str, ...]:
|
||||
"""Return short fragments useful for verifying a rendered paste."""
|
||||
|
||||
normalized = " ".join(text.split())
|
||||
if not normalized:
|
||||
return ("",)
|
||||
candidates = [normalized[:max_len]]
|
||||
if len(normalized) > max_len:
|
||||
midpoint = max(0, (len(normalized) // 2) - (max_len // 2))
|
||||
candidates.append(normalized[midpoint : midpoint + max_len])
|
||||
return tuple(dict.fromkeys(candidates))
|
||||
|
||||
|
||||
def body_contains_text_fragment(body: bytes | str, text: str) -> bool:
|
||||
"""Return whether a response body contains an original text fragment.
|
||||
|
||||
The check tolerates HTML escaping and whitespace differences.
|
||||
"""
|
||||
|
||||
if isinstance(body, bytes):
|
||||
body_text = body.decode("utf-8", errors="replace")
|
||||
else:
|
||||
body_text = body
|
||||
decoded = html.unescape(body_text)
|
||||
compact_body = " ".join(decoded.split())
|
||||
for fragment in text_probe_fragments(text):
|
||||
if fragment and fragment in compact_body:
|
||||
return True
|
||||
return text == "" and compact_body == ""
|
||||
|
||||
|
||||
def available_provider_names(names: Iterable[str]) -> tuple[str, ...]:
|
||||
"""Return provider names as a stable tuple."""
|
||||
|
||||
return tuple(names)
|
||||
@@ -0,0 +1,33 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=61"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "publicpaste"
|
||||
version = "0.1.0"
|
||||
description = "Paste text to the first working public paste provider."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9"
|
||||
license = { text = "MIT" }
|
||||
authors = [{ name = "publicpaste contributors" }]
|
||||
keywords = ["paste", "cli", "public", "snippet"]
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Environment :: Console",
|
||||
"Intended Audience :: Developers",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3 :: Only",
|
||||
"Topic :: Utilities",
|
||||
]
|
||||
dependencies = []
|
||||
|
||||
[project.scripts]
|
||||
publicpaste = "publicpaste.cli:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
include = ["publicpaste*"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "-q"
|
||||
testpaths = ["tests"]
|
||||
@@ -0,0 +1,35 @@
|
||||
# publicpaste skill
|
||||
|
||||
Use `publicpaste` when you need to publish non-sensitive text to a simple public paste service and return a direct URL quickly.
|
||||
|
||||
## Capabilities
|
||||
|
||||
- Python 3, standard library only at runtime.
|
||||
- CLI: `publicpaste "text"`, `echo text | publicpaste`, `publicpaste --file a.txt`.
|
||||
- API: `paste_text(text, timeout=8.0, overall_timeout=12.0, providers=None, disabled_providers=None, verify=False)`.
|
||||
- Runs enabled providers concurrently and returns the first successful validated URL.
|
||||
- Aggregates provider errors when all providers fail.
|
||||
- Optional `verify=True` GETs the returned URL, requires HTTP 2xx, and checks that the body contains a fragment of the original text with HTML unescaping tolerated.
|
||||
|
||||
## Providers
|
||||
|
||||
Default providers are:
|
||||
|
||||
- `paste.rs`
|
||||
- `dpaste.org`
|
||||
- `0x0.st`
|
||||
- `snippet.host`
|
||||
- `paste.centos.org` (browser-compatible form endpoint; `/api/create` may require an API key)
|
||||
- `termbin.com` (low priority, TCP 9999)
|
||||
|
||||
Use `--list-providers`, `--provider NAME`, and `--disable-provider NAME` to inspect or tune provider selection.
|
||||
|
||||
## Excluded services
|
||||
|
||||
Do not add PrivateBin, privydrop, clipboard-style services, or similar services unless they provide a stable, documented, non-interactive API that returns a direct public URL from a simple request. They are intentionally excluded when they depend on browser JavaScript, P2P flows, access codes, encrypted URL fragments, unstable APIs, or workflows that are not reliably automatable from the Python standard library.
|
||||
|
||||
Initially reviewed but excluded by default: `clipboard.im`, `clipboard.run`, `clipboardify.com`, `clipboardonline.com`, `privydrop.app`, `privatebin.net`, `paste.drhack.net`, `pasteme.cn`, `paste.sdjz.wiki`, and `myonlineclipboard.com`.
|
||||
|
||||
## Testing guidance
|
||||
|
||||
Tests should be unit tests only. Do not hit external networks. Mock provider behavior for concurrency and CLI tests, and test provider response parsing through each provider's parsing method.
|
||||
@@ -0,0 +1,116 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from publicpaste.cli import main, read_input
|
||||
from publicpaste.core import PasteError, paste_text
|
||||
from publicpaste.providers import (
|
||||
DpasteProvider,
|
||||
PasteCentosProvider,
|
||||
PasteProvider,
|
||||
PasteRsProvider,
|
||||
ProviderError,
|
||||
ProviderResult,
|
||||
SnippetHostProvider,
|
||||
TermbinProvider,
|
||||
ZeroXZeroProvider,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("parser", "body", "headers", "expected"),
|
||||
[
|
||||
(PasteRsProvider.parse_response, "https://paste.rs/abc\n", {}, "https://paste.rs/abc"),
|
||||
(DpasteProvider.parse_response, b"https://dpaste.org/ABC123\n", {}, "https://dpaste.org/ABC123"),
|
||||
(ZeroXZeroProvider.parse_response, "https://0x0.st/abcd.txt\n", {}, "https://0x0.st/abcd.txt"),
|
||||
(SnippetHostProvider.parse_response, "", {"Location": "/abc123"}, "https://snippet.host/abc123"),
|
||||
(
|
||||
SnippetHostProvider.parse_response,
|
||||
'<a href="https://snippet.host/xyz">snippet</a>',
|
||||
{},
|
||||
"https://snippet.host/xyz",
|
||||
),
|
||||
(
|
||||
PasteCentosProvider.parse_response,
|
||||
'{"url":"https://paste.centos.org/view/42"}',
|
||||
{},
|
||||
"https://paste.centos.org/view/42",
|
||||
),
|
||||
(PasteCentosProvider.parse_response, '{"id": 42}', {}, "https://paste.centos.org/view/42"),
|
||||
(TermbinProvider.parse_response, "https://termbin.com/abcd\n", {}, "https://termbin.com/abcd"),
|
||||
],
|
||||
)
|
||||
def test_provider_parse_response(parser, body: bytes | str, headers: dict[str, str], expected: str) -> None:
|
||||
assert parser(body, headers) == expected
|
||||
|
||||
|
||||
def test_provider_parse_response_rejects_missing_url() -> None:
|
||||
with pytest.raises(ProviderError):
|
||||
PasteRsProvider.parse_response("not a url")
|
||||
|
||||
|
||||
class FakeProvider(PasteProvider):
|
||||
name = "fake"
|
||||
|
||||
def __init__(self, url: str | None = None, delay: float = 0.0, error: str | None = None) -> None:
|
||||
self.url = url
|
||||
self.delay = delay
|
||||
self.error = error
|
||||
|
||||
def paste(self, text: str, *, timeout: float) -> ProviderResult:
|
||||
if self.delay:
|
||||
time.sleep(self.delay)
|
||||
if self.error:
|
||||
raise ProviderError(self.error)
|
||||
assert self.url is not None
|
||||
return ProviderResult(provider=self.name, url=self.url)
|
||||
|
||||
|
||||
def test_paste_text_returns_first_success() -> None:
|
||||
slow = FakeProvider("https://paste.rs/slow", delay=0.2)
|
||||
fast = FakeProvider("https://paste.rs/fast", delay=0.01)
|
||||
result = paste_text("hello", providers=[slow, fast], overall_timeout=1.0)
|
||||
assert result.url == "https://paste.rs/fast"
|
||||
assert result.provider == "fake"
|
||||
|
||||
|
||||
def test_paste_text_aggregates_all_failures() -> None:
|
||||
with pytest.raises(PasteError) as exc_info:
|
||||
paste_text(
|
||||
"hello",
|
||||
providers=[FakeProvider(error="boom"), FakeProvider(error="nope")],
|
||||
overall_timeout=1.0,
|
||||
)
|
||||
assert "all providers failed" in str(exc_info.value)
|
||||
assert exc_info.value.errors == {"fake": "nope"} or exc_info.value.errors == {"fake": "boom"}
|
||||
|
||||
|
||||
def test_cli_reads_positional_text(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
def fake_paste_text(text: str, **kwargs):
|
||||
assert text == "hello"
|
||||
return type("Result", (), {"url": "https://paste.rs/abc", "provider": "paste.rs"})()
|
||||
|
||||
monkeypatch.setattr("publicpaste.cli.paste_text", fake_paste_text)
|
||||
assert main(["hello"]) == 0
|
||||
assert capsys.readouterr().out == "https://paste.rs/abc\n"
|
||||
|
||||
|
||||
def test_cli_json_from_stdin(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None:
|
||||
def fake_paste_text(text: str, **kwargs):
|
||||
assert text == "from stdin"
|
||||
return type("Result", (), {"url": "https://paste.rs/abc", "provider": "paste.rs"})()
|
||||
|
||||
monkeypatch.setattr("publicpaste.cli.paste_text", fake_paste_text)
|
||||
monkeypatch.setattr("sys.stdin", type("Stdin", (), {"read": lambda self: "from stdin"})())
|
||||
assert main(["--json"]) == 0
|
||||
assert '"provider": "paste.rs"' in capsys.readouterr().out
|
||||
|
||||
|
||||
def test_read_input_rejects_text_and_file(tmp_path) -> None:
|
||||
path = tmp_path / "a.txt"
|
||||
path.write_text("file", encoding="utf-8")
|
||||
args = type("Args", (), {"text": "hello", "file": path})()
|
||||
with pytest.raises(ValueError):
|
||||
read_input(args, stdin=None) # type: ignore[arg-type]
|
||||
@@ -0,0 +1,42 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from publicpaste.validation import ValidationError, body_contains_text_fragment, validate_url
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("provider", "url"),
|
||||
[
|
||||
("paste.rs", "https://paste.rs/abc123"),
|
||||
("dpaste.org", "https://dpaste.org/ABC123"),
|
||||
("0x0.st", "https://0x0.st/abcd.txt"),
|
||||
("snippet.host", "https://snippet.host/abcdef"),
|
||||
("paste.centos.org", "https://paste.centos.org/view/12345"),
|
||||
("termbin.com", "http://termbin.com/abc"),
|
||||
],
|
||||
)
|
||||
def test_validate_url_accepts_provider_urls(provider: str, url: str) -> None:
|
||||
assert validate_url(provider, url) == url
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("provider", "url"),
|
||||
[
|
||||
("paste.rs", "ftp://paste.rs/abc"),
|
||||
("paste.rs", "https://example.com/abc"),
|
||||
("0x0.st", "https://0x0.st/"),
|
||||
("dpaste.org", "https://dpaste.org/a b"),
|
||||
],
|
||||
)
|
||||
def test_validate_url_rejects_bad_urls(provider: str, url: str) -> None:
|
||||
with pytest.raises(ValidationError):
|
||||
validate_url(provider, url)
|
||||
|
||||
|
||||
def test_body_contains_text_fragment_tolerates_html_escape() -> None:
|
||||
assert body_contains_text_fragment(b"<pre>hello & goodbye</pre>", "hello & goodbye")
|
||||
|
||||
|
||||
def test_body_contains_text_fragment_rejects_missing_text() -> None:
|
||||
assert not body_contains_text_fragment("something else", "hello world")
|
||||
Reference in New Issue
Block a user