1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
| from __future__ import annotations
from typing import Any
from tantoc2.server.agentless_base import (
AgentlessModuleBase,
AgentlessMetadata,
AgentlessResult,
AgentlessTarget,
)
from tantoc2.server.module_base import OptionSchema
class LDAPEnumModule(AgentlessModuleBase):
@classmethod
def metadata(cls) -> AgentlessMetadata:
return AgentlessMetadata(
name="ldap_enum",
description="Enumerate LDAP directories",
author="Your Name",
protocol="ldap",
operations=["users", "groups", "computers"],
connection_params={
"host": OptionSchema(
name="host", type="str",
description="LDAP server", required=True,
),
"port": OptionSchema(
name="port", type="int",
description="LDAP port", required=True,
default=389,
),
},
options_schema={
"base_dn": OptionSchema(
name="base_dn", type="str",
description="Base DN for search",
required=True,
),
"filter": OptionSchema(
name="filter", type="str",
description="LDAP filter",
required=False,
default="(objectClass=*)",
),
},
mitre_attack=["T1087"],
dependencies=["ldap3"],
)
def execute(
self,
operation: str,
targets: list[AgentlessTarget],
options: dict[str, Any],
credentials: dict[str, dict[str, str]] | None = None,
proxy: dict[str, Any] | None = None,
) -> list[AgentlessResult]:
results = []
for target in targets:
try:
result = self._run_single(
operation, target, options, credentials, proxy
)
results.append(result)
except Exception as e:
results.append(AgentlessResult(
target=target, success=False, error=str(e),
))
return results
def _run_single(self, operation, target, options, credentials, proxy):
import ldap3
cred = None
if credentials and target.credential_id:
cred = credentials[target.credential_id]
server = ldap3.Server(target.host, port=target.port or 389)
conn = ldap3.Connection(
server,
user=cred["username"] if cred else None,
password=cred["secret"] if cred else None,
auto_bind=True,
)
conn.search(options["base_dn"], options["filter"])
return AgentlessResult(
target=target,
success=True,
data={"entries": [str(e) for e in conn.entries]},
)
|