Skip to main content
  1. Documentation/
  2. Developer Guide/

Building Tool (Agentless) Plugins

Table of Contents
Tool plugins execute directly from the teamserver against remote services. No agent deployment required — ideal for initial access, lateral movement, and service enumeration over any protocol.

How Tool Plugins Work
#

A tool plugin (called an “agentless module” internally) is a Python class that:

  1. Declares its protocol, operations, and argument schemas as class attributes
  2. Implements handle_<operation> methods — one per supported operation
  3. Receives decrypted credentials from the credential store
  4. Optionally contributes discovered credentials back to the store

The base class auto-discovers commands from class attributes and auto-dispatches execute() to the correct handle_* method. You do not need to override metadata() or execute().

Base Class
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# tantoc2/src/tantoc2/server/agentless_base.py

class AgentlessModuleBase(PluginBase):
    # Class-level metadata (set by subclasses)
    name: ClassVar[str] = ""
    description: ClassVar[str] = ""
    protocol: ClassVar[str] = ""      # "ssh", "smb", "ldap", etc.
    author: ClassVar[str] = ""
    supports_shell: ClassVar[bool] = False
    mitre_attack: ClassVar[list[str]] = []
    dependencies: ClassVar[list[str]] = []

    # Auto-discovered from Command class attributes
    _discovered_commands: ClassVar[dict[str, Command]] = {}

__init_subclass__ scans for Command class attributes and populates _discovered_commands. The metadata() method builds AgentlessMetadata from class attributes and discovered commands automatically.

Step-by-Step: Writing a Tool Plugin
#

Step 1: Create the project structure
#

1
2
3
4
5
6
7
8
tantoc2-tool-winrm/
  pyproject.toml
  src/
    tantoc2_tool_winrm/
      __init__.py
      winrm_exec.py
  tests/
    test_winrm_exec.py

Step 2: Write pyproject.toml
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "tantoc2-tool-winrm"
version = "0.1.0"
description = "TantoC2 WinRM remote execution tool"
requires-python = ">=3.11"
dependencies = [
    "tantoc2",
    "pywinrm>=0.4",
]

[project.optional-dependencies]
proxy = ["PySocks"]
dev = ["pytest>=8.0", "pytest-cov>=5.0"]

[project.entry-points."tantoc2.agentless_modules"]
winrm = "tantoc2_tool_winrm.winrm_exec:WinRMExecModule"

[tool.hatch.build.targets.wheel]
packages = ["src/tantoc2_tool_winrm"]

Step 3: Implement AgentlessModuleBase
#

Use class attributes for metadata, Command class attributes for operations, and handle_<operation> methods for execution.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from __future__ import annotations

import logging
from typing import Any, ClassVar

from tantoc2.server.agentless_base import (
    AgentlessModuleBase,
    AgentlessResult,
    AgentlessTarget,
)
from tantoc2.server.command_schema import Command
from tantoc2.server.module_base import ExtractedCredential

logger = logging.getLogger(__name__)


class WinRMExecModule(AgentlessModuleBase):
    """Execute commands on Windows hosts via WinRM."""

    # -- Class-level metadata --
    name: ClassVar[str] = "winrm"
    description: ClassVar[str] = "Execute commands on Windows hosts via WinRM/PS Remoting"
    author: ClassVar[str] = "Your Name"
    protocol: ClassVar[str] = "winrm"
    mitre_attack: ClassVar[list[str]] = ["T1021.006"]
    dependencies: ClassVar[list[str]] = ["pywinrm>=0.4"]

    # -- Command declarations (one per operation, auto-discovered) --

    exec = (
        Command("exec", "Execute command via WinRM")
        .arg("command", type="str", desc="Command to execute")
        .arg("use_ssl", type="bool", desc="Use HTTPS (port 5986)", default=False,
             positional=False)
    )

    ps = (
        Command("ps", "Execute PowerShell script via WinRM")
        .arg("command", type="str", desc="PowerShell command to execute")
    )

    # -- Handle methods (auto-dispatched by execute()) --

    def handle_exec(
        self,
        targets: list[AgentlessTarget],
        options: dict[str, Any],
        *,
        credentials: dict[str, dict[str, str]] | None = None,
        proxy: dict[str, Any] | None = None,
    ) -> list[AgentlessResult]:
        """Execute a WinRM command against each target."""
        results: list[AgentlessResult] = []
        for target in targets:
            try:
                results.append(
                    self._execute_single("exec", target, options, credentials, proxy)
                )
            except Exception as exc:
                logger.exception("WinRM exec failed for %s", target.host)
                results.append(AgentlessResult(
                    target=target, success=False, error=str(exc)
                ))
        return results

    def handle_ps(
        self,
        targets: list[AgentlessTarget],
        options: dict[str, Any],
        *,
        credentials: dict[str, dict[str, str]] | None = None,
        proxy: dict[str, Any] | None = None,
    ) -> list[AgentlessResult]:
        """Execute a PowerShell command against each target."""
        results: list[AgentlessResult] = []
        for target in targets:
            try:
                results.append(
                    self._execute_single("ps", target, options, credentials, proxy)
                )
            except Exception as exc:
                logger.exception("WinRM ps failed for %s", target.host)
                results.append(AgentlessResult(
                    target=target, success=False, error=str(exc)
                ))
        return results

    # -- Internal helpers --

    def _execute_single(
        self,
        operation: str,
        target: AgentlessTarget,
        options: dict[str, Any],
        credentials: dict[str, dict[str, str]] | None,
        proxy: dict[str, Any] | None,
    ) -> AgentlessResult:
        import winrm

        port = target.port or (5986 if options.get("use_ssl") else 5985)
        scheme = "https" if options.get("use_ssl") else "http"

        username, password = None, None
        if target.credential_id and credentials:
            cred = credentials.get(target.credential_id, {})
            username = cred.get("username")
            password = cred.get("secret")

        if not username:
            return AgentlessResult(
                target=target, success=False, error="No credentials provided"
            )

        session = winrm.Session(
            f"{scheme}://{target.host}:{port}/wsman",
            auth=(username, password),
        )
        command = options["command"]
        response = session.run_ps(command) if operation == "ps" else session.run_cmd(command)

        success = response.status_code == 0
        return AgentlessResult(
            target=target,
            success=success,
            data={
                "exit_code": response.status_code,
                "stdout": response.std_out.decode("utf-8", errors="replace"),
                "stderr": response.std_err.decode("utf-8", errors="replace"),
            },
            raw_output=response.std_out.decode("utf-8", errors="replace"),
            error=response.std_err.decode("utf-8", errors="replace") if not success else None,
        )

The handle_<operation> Contract
#

The base class execute() method dispatches to handle_<operation> based on the operation name. Each handler receives the same parameters:

ParameterTypeDescription
targetslist[AgentlessTarget]Each has host, optional port, optional credential_id.
optionsdict[str, Any]Validated per the Command schema. Required options are already checked.
credentialsdict[str, dict[str, str]] | NoneMaps credential_id → {"username", "secret", "cred_type", "domain", ...}. Secrets are decrypted by the manager before this call.
proxydict[str, Any] | NoneProxy configuration.

Return exactly one AgentlessResult per target — even on failure. Never raise from a handler; catch exceptions per-target and return AgentlessResult(success=False, error=str(exc)).

AgentlessMetadata
#

The base class builds AgentlessMetadata automatically from class attributes and discovered Command instances. You only need to inspect it if you have special requirements.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@dataclass
class AgentlessMetadata:
    name: str
    description: str
    protocol: str
    operations: list[str]           # derived from Command class attributes
    command_schemas: dict[str, CommandSchema]
    author: str = ""
    supports_shell: bool = False
    mitre_attack: list[str] = field(default_factory=list)
    dependencies: list[str] = field(default_factory=list)

Credential Integration
#

Reading Credentials
#

When a target has a credential_id, the manager decrypts the credential and passes it in credentials:

1
2
3
4
5
6
if target.credential_id and credentials:
    cred = credentials[target.credential_id]
    username = cred["username"]
    secret = cred["secret"]       # plaintext password, hash, or key PEM
    cred_type = cred["cred_type"] # "plaintext", "hash", "ssh_key", "api_key", etc.
    domain = cred.get("domain")   # may be None

Contributing Credentials
#

Return ExtractedCredential objects in results to populate the credential store:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from tantoc2.server.module_base import ExtractedCredential

AgentlessResult(
    target=target,
    success=True,
    data={"users": found_users},
    credentials=[
        ExtractedCredential(
            cred_type="plaintext",
            username="administrator",
            secret="P@ssw0rd",
            domain="CORP",
            source_host=target.host,
            notes="Extracted from /etc/shadow",
        ),
        ExtractedCredential(
            cred_type="hash",
            username="serviceaccount",
            secret="aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0",
            domain="CORP",
            source_host=target.host,
        ),
    ],
)

cred_type values: plaintext, hash, ticket, token, ssh_key, api_key, certificate.

Proxy Support
#

When a proxy is configured, the proxy dict contains:

1
2
3
4
5
6
7
{
    "proxy_type": "socks5",     # "socks4", "socks5", or "ssh_tunnel"
    "host": "10.0.0.1",
    "port": 1080,
    "username": "user",          # optional
    "credential_id": "abc123",   # optional — look up in credentials
}

Route your connections through the proxy using PySocks:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def _create_proxy_socket(self, proxy: dict, dest_host: str, dest_port: int):
    try:
        import socks
    except ImportError:
        raise ImportError("PySocks is required for proxy support") from None

    proxy_type = {"socks4": socks.SOCKS4, "socks5": socks.SOCKS5}.get(
        proxy.get("proxy_type", "socks5"), socks.SOCKS5
    )
    sock = socks.socksocket()
    sock.set_proxy(
        proxy_type,
        proxy["host"],
        proxy["port"],
        username=proxy.get("username"),
    )
    sock.connect((dest_host, dest_port))
    return sock

Make proxy support optional — declare PySocks as an optional dependency:

1
2
[project.optional-dependencies]
proxy = ["PySocks"]

Reference Implementation: SSH Tool
#

Source: tools/ssh/src/tantoc2_tool_ssh/ssh_command.py

1
2
3
4
tools/ssh/
  pyproject.toml              # entry-point: tantoc2.agentless_modules → ssh
  src/tantoc2_tool_ssh/
    ssh_command.py            # SSHCommandModule

Key points from the SSH reference (uses the v2 class-attribute pattern):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class SSHCommandModule(AgentlessModuleBase):
    name = "ssh"
    description = "Execute commands on remote hosts via SSH"
    protocol = "ssh"
    supports_shell = True

    exec = (
        Command("exec", "Execute remote command")
        .arg("command", type="str", desc="Command to execute", required=True)
    )
    upload = (
        Command("upload", "Upload file to remote host")
        .file("local_path", desc="Local file to upload")
        .path("remote_path", desc="Remote destination path", required=True)
    )
    download = (
        Command("download", "Download file from remote host")
        .path("remote_path", desc="Remote file path", required=True)
        .path("local_path", desc="Local save path", required=False, default=".")
    )

    def handle_exec(self, targets, options, *, credentials=None, proxy=None):
        ...

    def handle_upload(self, targets, options, *, credentials=None, proxy=None):
        ...

    def handle_download(self, targets, options, *, credentials=None, proxy=None):
        ...

    def _execute_single(self, operation, target, options, credentials, proxy):
        # Credential resolution
        cred_type = cred.get("cred_type", "plaintext")
        if cred_type == "ssh_key":
            pkey = self._load_private_key(cred["secret"])   # PEM → paramiko key
        else:
            password = cred["secret"]

        # Optional SOCKS proxy
        sock = None
        if proxy:
            sock = self._create_proxy_socket(proxy, target.host, port)

        client.connect(..., sock=sock)

The _load_private_key method tries RSA, Ed25519, ECDSA, and DSS key types:

1
2
3
4
5
6
7
8
9
@staticmethod
def _load_private_key(key_data: str):
    import paramiko, io
    for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey, paramiko.DSSKey]:
        try:
            return cls.from_private_key(io.StringIO(key_data))
        except (paramiko.SSHException, ValueError):
            continue
    raise ValueError("Unable to parse SSH private key")

Testing
#

Unit Tests
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import pytest
from tantoc2_tool_winrm.winrm_exec import WinRMExecModule
from tantoc2.server.agentless_base import AgentlessTarget


def test_metadata():
    meta = WinRMExecModule.metadata()
    assert meta.name == "winrm"
    assert "exec" in meta.operations
    assert "ps" in meta.operations
    assert meta.protocol == "winrm"


def test_handle_exec_missing_credentials():
    module = WinRMExecModule()
    target = AgentlessTarget(host="10.0.0.1", credential_id=None)
    results = module.execute("exec", [target], {"command": "whoami"})
    assert len(results) == 1
    assert not results[0].success
    assert "No credentials" in results[0].error


def test_execute_handles_exception():
    module = WinRMExecModule()
    target = AgentlessTarget(host="10.0.0.1", credential_id="cred-1")
    credentials = {"cred-1": {"username": "admin", "secret": "wrong", "cred_type": "plaintext"}}

    from unittest.mock import patch
    with patch("winrm.Session") as mock_session:
        mock_session.side_effect = Exception("Connection refused")
        results = module.execute("exec", [target], {"command": "whoami"}, credentials)

    assert len(results) == 1
    assert not results[0].success
    assert "Connection refused" in results[0].error

Integration Testing
#

1
2
3
4
5
hatch run dev:pip install -e ./tantoc2-tool-winrm/
# In the running teamserver CLI:
tantoc2> tools list
# Should show: winrm
tantoc2> tools run winrm exec --host 10.0.0.1 --credential-id <id> --command whoami

Deployment
#

Method 1: Standalone Package (Recommended)#

1
2
3
hatch build
pip install dist/tantoc2_tool_winrm-0.1.0-py3-none-any.whl
# Or drop the .whl in the plugin inbox

Method 2: File Drop
#

1
2
cp winrm_exec.py tantoc2/plugins/agentless/
tantoc2> tools refresh

Common Pitfalls
#

Raising from a handler — the manager does not catch exceptions from handle_* methods unless you wrap them. Always use try/except per-target inside each handler.

Returning wrong number of results — return exactly one result per target. Missing results cause the manager to skip auto-credential extraction for those targets.

Blocking forever — set socket timeouts. A hung handler blocks the entire tools execution queue.

dependencies mismatch — declare dependencies in both pyproject.toml (for installation) and the class-level dependencies attribute (for auto-install at discovery time).

Accessing secret before checking credentialscredentials is None when no credential IDs are on any target. Always check before subscripting.

See Plugin Packaging for the full packaging reference.