Transport plugins define how listeners accept and handle connections from agents. Build a custom transport to implement any C2 channel — DNS, named pipes, websockets, or domain fronting.
Minimal Example
# 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
| from __future__ import annotations
import threading
from typing import Any, Callable
from tantoc2.server.transports_module import TransportBase, TransportConfig
class DNSTransport(TransportBase):
@classmethod
def plugin_name(cls) -> str:
return "dns"
def __init__(
self, config: TransportConfig, on_message: Callable[..., Any]
) -> None:
self._config = config
self._on_message = on_message
self._running = False
self._thread: threading.Thread | None = None
def start(self) -> None:
"""Start listening for DNS queries."""
self._running = True
self._thread = threading.Thread(
target=self._serve, daemon=True
)
self._thread.start()
def stop(self) -> None:
"""Stop the listener and release the port."""
self._running = False
if self._thread:
self._thread.join(timeout=5)
def is_running(self) -> bool:
return self._running
def send(self, client_id: str, data: bytes) -> None:
"""Queue response data for a client.
Returned on the client's next query."""
self._response_queue[client_id] = data
def _serve(self) -> None:
"""Main listener loop (runs in background thread)."""
# Bind UDP socket on config.port
# Loop: receive DNS query → extract payload → call on_message()
# Queue response for next query from same client
while self._running:
data, addr = self._socket.recvfrom(4096)
client_id = f"{addr[0]}:{addr[1]}"
payload = self._extract_payload(data)
response = self._on_message(client_id, payload)
if response:
self._response_queue[client_id] = response
|
Interface Reference
#TransportBase
#| Method | Required | Description |
|---|
plugin_name() | Yes | Unique transport name (e.g., "dns", "websocket") |
__init__(config, on_message) | Yes | Initialize with config and pipeline callback |
start() | Yes | Start listening in a background thread |
stop() | Yes | Stop listening and release the port |
is_running() | Yes | Whether the transport is actively accepting connections |
send(client_id, data) | Yes | Queue response data for a client |
TransportConfig
#1
2
3
4
5
6
7
8
| @dataclass
class TransportConfig:
host: str = "0.0.0.0"
port: int = 443
tls_enabled: bool = False
tls_cert_file: str | None = None
tls_key_file: str | None = None
options: dict[str, Any] = field(default_factory=dict)
|
The options dict contains transport-specific configuration passed from the listener creation API.
Message Callback
#The on_message callback connects your transport to the message pipeline:
1
2
3
4
5
6
7
8
9
10
| def on_message(client_id: str, data: bytes) -> bytes | None:
"""Process agent data through the pipeline.
Args:
client_id: Your transport-assigned client identifier (e.g., "IP:port")
data: Raw bytes received from the agent (includes 20-byte wire header)
Returns:
Response bytes to send back to the agent, or None.
"""
|
The data you pass must include the full wire header (4 bytes magic + 16 bytes session token + payload). The pipeline handles routing, decryption, and response generation.
Key Requirements
#Background Thread
#Transports run in background daemon threads. The start() method must return immediately — spawn a thread for the actual listening loop.
Port Release
#When stop() is called, the transport must release its port immediately. The listener manager calls stop() on shutdown and when operators explicitly stop a listener.
Client Identification
#Assign a consistent client_id per connection. For stateless protocols (HTTP, DNS), derive it from the remote address. For persistent connections (TCP, WebSocket), use the connection handle.
Thread Safety
#Multiple agents may connect simultaneously. Use locks to protect shared state (response queues, connection maps). Call on_message() outside of locks to prevent deadlocks.
TLS Support
#If config.tls_enabled is True:
- Use
config.tls_cert_file and config.tls_key_file if provided - Otherwise, generate a self-signed certificate
- Wrap your socket/server with an SSL context
Built-in Transports
#HTTP (http)
#GET / → health check (200 OK)POST / → agent check-in (body = raw wire bytes, response = queued tasks)- Background thread with
http.server.HTTPServer - TLS: custom certs or auto-generated self-signed (2048-bit RSA, 365-day)
TCP (tcp)
#- Length-prefixed messages:
[4-byte big-endian length][payload] - Persistent connections with per-client read loop
- TLS: same as HTTP
Deployment
#Place the file in plugins/transports/:
1
2
3
4
| plugins/
transports/
dns_transport.py
websocket_transport.py
|
The transport is available on the next server start. Operators create listeners using your transport name:
1
2
| tantoc2> listeners create dns --name my-dns --port 53
tantoc2> listeners start my-dns
|
Design Patterns
#Stateless Protocols (HTTP, DNS)
#1
2
3
4
5
| # Request-response: process message immediately
def handle_request(self, data, client_addr):
client_id = f"{client_addr[0]}:{client_addr[1]}"
response = self._on_message(client_id, data)
return response # Send back in the same request
|
Persistent Connections (TCP, WebSocket)
#1
2
3
4
5
6
7
8
| # Long-lived connection: read loop per client
def handle_client(self, conn, addr):
client_id = f"{addr[0]}:{addr[1]}"
while self._running:
data = self._read_message(conn)
response = self._on_message(client_id, data)
if response:
self._send_message(conn, response)
|
Covert Channels (DNS, ICMP)
#1
2
3
4
5
6
7
8
| # Multi-packet reassembly: buffer fragments, deliver complete message
def handle_fragment(self, fragment, client_id):
self._buffers[client_id].append(fragment)
if self._is_complete(self._buffers[client_id]):
full_message = self._reassemble(self._buffers[client_id])
self._buffers[client_id] = []
response = self._on_message(client_id, full_message)
self._queue_response(client_id, response)
|