Skip to main content
  1. Documentation/
  2. Developer Guide/

Building Transport Plugins

Table of Contents
Transport plugins define how listeners accept and handle connections from agents. Build a custom transport to implement any C2 channel — DNS, named pipes, websockets, or domain fronting.

Minimal Example
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from __future__ import annotations
import threading
from typing import Any, Callable
from tantoc2.server.transports_module import TransportBase, TransportConfig


class DNSTransport(TransportBase):
    @classmethod
    def plugin_name(cls) -> str:
        return "dns"

    def __init__(
        self, config: TransportConfig, on_message: Callable[..., Any]
    ) -> None:
        self._config = config
        self._on_message = on_message
        self._running = False
        self._thread: threading.Thread | None = None

    def start(self) -> None:
        """Start listening for DNS queries."""
        self._running = True
        self._thread = threading.Thread(
            target=self._serve, daemon=True
        )
        self._thread.start()

    def stop(self) -> None:
        """Stop the listener and release the port."""
        self._running = False
        if self._thread:
            self._thread.join(timeout=5)

    def is_running(self) -> bool:
        return self._running

    def send(self, client_id: str, data: bytes) -> None:
        """Queue response data for a client.
        Returned on the client's next query."""
        self._response_queue[client_id] = data

    def _serve(self) -> None:
        """Main listener loop (runs in background thread)."""
        # Bind UDP socket on config.port
        # Loop: receive DNS query → extract payload → call on_message()
        # Queue response for next query from same client
        while self._running:
            data, addr = self._socket.recvfrom(4096)
            client_id = f"{addr[0]}:{addr[1]}"
            payload = self._extract_payload(data)
            response = self._on_message(client_id, payload)
            if response:
                self._response_queue[client_id] = response

Interface Reference
#

TransportBase
#

MethodRequiredDescription
plugin_name()YesUnique transport name (e.g., "dns", "websocket")
__init__(config, on_message)YesInitialize with config and pipeline callback
start()YesStart listening in a background thread
stop()YesStop listening and release the port
is_running()YesWhether the transport is actively accepting connections
send(client_id, data)YesQueue response data for a client

TransportConfig
#

1
2
3
4
5
6
7
8
@dataclass
class TransportConfig:
    host: str = "0.0.0.0"
    port: int = 443
    tls_enabled: bool = False
    tls_cert_file: str | None = None
    tls_key_file: str | None = None
    options: dict[str, Any] = field(default_factory=dict)

The options dict contains transport-specific configuration passed from the listener creation API.

Message Callback
#

The on_message callback connects your transport to the message pipeline:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def on_message(client_id: str, data: bytes) -> bytes | None:
    """Process agent data through the pipeline.

    Args:
        client_id: Your transport-assigned client identifier (e.g., "IP:port")
        data: Raw bytes received from the agent (includes 20-byte wire header)

    Returns:
        Response bytes to send back to the agent, or None.
    """

The data you pass must include the full wire header (4 bytes magic + 16 bytes session token + payload). The pipeline handles routing, decryption, and response generation.

Key Requirements
#

Background Thread
#

Transports run in background daemon threads. The start() method must return immediately — spawn a thread for the actual listening loop.

Port Release
#

When stop() is called, the transport must release its port immediately. The listener manager calls stop() on shutdown and when operators explicitly stop a listener.

Client Identification
#

Assign a consistent client_id per connection. For stateless protocols (HTTP, DNS), derive it from the remote address. For persistent connections (TCP, WebSocket), use the connection handle.

Thread Safety
#

Multiple agents may connect simultaneously. Use locks to protect shared state (response queues, connection maps). Call on_message() outside of locks to prevent deadlocks.

TLS Support
#

If config.tls_enabled is True:

  1. Use config.tls_cert_file and config.tls_key_file if provided
  2. Otherwise, generate a self-signed certificate
  3. Wrap your socket/server with an SSL context

Built-in Transports
#

HTTP (http)
#

  • GET / → health check (200 OK)
  • POST / → agent check-in (body = raw wire bytes, response = queued tasks)
  • Background thread with http.server.HTTPServer
  • TLS: custom certs or auto-generated self-signed (2048-bit RSA, 365-day)

TCP (tcp)
#

  • Length-prefixed messages: [4-byte big-endian length][payload]
  • Persistent connections with per-client read loop
  • TLS: same as HTTP

Deployment
#

Place the file in plugins/transports/:

1
2
3
4
plugins/
  transports/
    dns_transport.py
    websocket_transport.py

The transport is available on the next server start. Operators create listeners using your transport name:

1
2
tantoc2> listeners create dns --name my-dns --port 53
tantoc2> listeners start my-dns

Design Patterns
#

Stateless Protocols (HTTP, DNS)
#

1
2
3
4
5
# Request-response: process message immediately
def handle_request(self, data, client_addr):
    client_id = f"{client_addr[0]}:{client_addr[1]}"
    response = self._on_message(client_id, data)
    return response  # Send back in the same request

Persistent Connections (TCP, WebSocket)
#

1
2
3
4
5
6
7
8
# Long-lived connection: read loop per client
def handle_client(self, conn, addr):
    client_id = f"{addr[0]}:{addr[1]}"
    while self._running:
        data = self._read_message(conn)
        response = self._on_message(client_id, data)
        if response:
            self._send_message(conn, response)

Covert Channels (DNS, ICMP)
#

1
2
3
4
5
6
7
8
# Multi-packet reassembly: buffer fragments, deliver complete message
def handle_fragment(self, fragment, client_id):
    self._buffers[client_id].append(fragment)
    if self._is_complete(self._buffers[client_id]):
        full_message = self._reassemble(self._buffers[client_id])
        self._buffers[client_id] = []
        response = self._on_message(client_id, full_message)
        self._queue_response(client_id, response)