The purpose of this article is to show the use of the principles of building an application architecture.
1.1.1 What we want to get
Most often it is necessary to collect information about:
- users, in particular we want to know:
– the total number of users;
– list of user groups;
– list of domain administrators;
– list of Enterprise admins;
– before the pile, I would also like to know which groups the user belongs to, under which we are logged in at the current time. - computers, namely
, the total number of active hosts in the domain;
– list and number of servers;
– list and number of workstations;
– list and number of domain controllers;
– statistics on installed operating systems are a very useful thing for finding vulnerabilities, but more on that another time (about finding vulnerabilities). - a list of shared network resources – the so-called “ball”, including understanding what rights the current user has to access them (read, write). And we also want to do it in several threads and quickly, or vice versa – we will behave very quietly and poll one host at a time for an arbitrary period of time.
And we also want the results of processing the information received to be sorted out. Users to users, admins to admins, balls to balls, workstations, servers and domain controllers should also be in their places.
1.2 The tools necessary to solve the problem.
It would be naive to believe that such a task has not arisen before. Accordingly, the methods of solving it should be publicly available. A few minutes of Googling and voila – a tool has been found that solves a similar problem. https://github.com/SecuProject/ADenum
We watch the video – well, it seems to be true. We look at the code. Crying. Open it again. We’re crying crocodile tears of blood again. To list the shortcomings of this project, you will need a separate article, so we will take them
for granted and try not to repeat the mistakes of others. As I go along, I will point them out in one way or another.
The main approaches to solving the problem will be discussed below.
1.2.1 LDAP
Open the project code and see that getting information about the network is based on executing LDAP queries.
We open the wiki and understand that LDAP is the Lightweight Directory Access Protocol –
“lightweight directory Access Protocol”), is an application–level protocol for accessing the X.500 directory service, developed by the IETF as a lightweight version of the ITU-T DAP protocol. LDAP is a relatively simple protocol that uses TCP/IP and allows authentication (bind), search and compare operations, as well as operations for adding, changing or deleting records. Normally, the LDAP server accepts incoming connections on port 389 over TCP or UDP protocols. For LDAP sessions encapsulated in SSL, port 636 is usually used. Again, we look at the code, wiki, carefully Google and understand that to perform LDAP queries we will need:
- Establish a connection with the directory service;
- Create a query using such magical gizmos as Object To Search and Attribute To Search. Why are they magical? Well, how else can an unprepared user name a string like:
object_to_search = ’(&(objectCategory=person)(objectClass=user) (userAccountControl:1.2.840.113556.1.4.803:=1048576))’
- Run the query and get its results;
- Process the results of query execution by discarding invalid records.
1.2.2 Processing command line arguments.
The ADEnum project code implements such processing. Only you’ll find her there first. And then try to figure out its structure without half a liter. Good luck.
So we’ll need our own command-line argument parser. Naturally, no one is going to write it from scratch, but its code should be placed in a separate module.
1.2.3 Logger
Logs are our everything. Without them, the existence of a more or less serious product is impossible. Therefore, we will also have a logger. It doesn’t matter if we add our own or someone else’s, but it will also live in a separate module, and we will also teach it to multithreading and writing to a file.
1.2.4 The results processing module
Currently, at the design stage, I have no idea what the result should look like, so let’s use one of Uncle Bob’s tips and postpone solving this issue until the very last moment. We will declare the interfaces, but leave the implementation empty. The same applies to the rest of the modules, with the exception of LDAP. Everything is simple and clear there.
2 Architecture design.
A well-designed architecture saves hundreds of man-hours and at least tens of thousands of evergreen pieces of paper. Therefore, you will have to spend some time at the start in order to painlessly add functionality later.
So, let’s start, of course, with LDAP. Because it will be one of the key components of the core of the system.
2.1 Designing the architecture of the LDAP query processing module
As mentioned above, we will have 3 submodules here. Namely:
- Connection management module;
- Query execution module;
- Results collection and processing module;
Go.
2.1.1 LdapConnection is a module that provides LDAP connection and management.
Let’s figure out what we need to connect to the directory service. Again, we look at the ADEnum project code, Google it, and study it. As a result, we come to the conclusion that we will need the following parameters:
- domainName is the domain name required to form a string of the form username + ’@’ + domain_name, which will be used as a parameter when establishing a connection;
- username – the user’s name;
- password – password;
- IPAddress is the address of the domain controller, since the directory service lives there;
- useLdapWithSsl flag, in case we decide to use an SSL connection to connect;
- BaseDN – Distinguished Name is a unique representation of the domain name record, we will get it ourselves. Below is a sample code.;
- LdapVersion – version of the LDAP protocol;
The first thought is to pass all these parameters to the constructor. It’s a good idea, but then we’ll get a lot of fun due to the fact that at this stage we don’t know anything about the exact number of parameters and their types. In addition, it is desirable to provide for their possible change in the future. We want to make a high-quality product, not a one-time craft. Therefore, we will wrap all the parameters in a class and pass an instance of the class already. And given that we are only at the beginning of the path, and we will encounter a similar situation repeatedly, then here we are directly asking for the creation of a base class with a minimum set of fields and methods that will be implemented in the child ones. If the development was conducted in a C-like language, then you could safely pass
a pointer to the base class and not warm your head. But we have a python. Of course, he knows how to do OOP, but he will have to twist a little. The details are below.
The advantages of such a solution:
- we do not depend on the number and types of parameters;
- it is possible to bring a single functionality for all configs into abstract methods of the base class, and implement them in the heirs. In particular, we will definitely need a method for validating the config and a method for displaying it on the screen;
- Versioning capability and backward compatibility support.
Disadvantages – you will have to write a little more code and monitor the implementation a little more carefully.
For a detailed description of the configuration module, see below.
And now let’s get back to our sheep. That is, to LDAP. Here and further, we assume that the configs and logger have already been implemented. So. Let’s try to figure out the functionality of the class that provides the connection. It is logical to assume that these will be approximately the following methods:
- initialization of connection parameters;
- Connection setup;
- disconnection;
- Error handling.
Let’s start with a description of the errors. The following problems may occur: - Authorization error – we were given incorrect credits. It was not possible to log in. Trouble. We are completing the work.
- The server is unavailable – the server is lying for a reason we don’t understand and we can’t do anything except ask the admin.
- The waiting time for the server response has expired – it is obvious here.
- Other errors – we don’t know anything about them, but we assume that they may be. Therefore, we will use an enumeration to describe the types of errors.
class LdapLoginError(enum.Enum):
NO_ERROR = 0
INVALID_CREDENTIALS = 1
SERVER_DOWN = 2
OTHER_ERROR = 3
TIMEOUT_ERROR = 4
Setting up a connection.
def is_ldap_connection_established(self) -> bool:
DumpLogger.print_title(f'{self.title} is_ldap_connection_established')
if self.ldap_config.ip_address is None:
DumpLogger.print_warning('ip address not specified, a default value will be used')
ip_address = NetworkUtils.ger_current_host_ip_address(self.ldap_config.domain_name)
if ip_address is None:
DumpLogger.print_error('Unable to resolve a domain name:', self.ldap_config.domain_name)
return False
self.ldap_config.ip_address = ip_address
DumpLogger.highlight_dark_blue("current ip:\t" + self.ldap_config.ip_address)
self.__setup_ldap_connection()
if self.connection is None:
DumpLogger.print_error_message('invalid LDAP connection')
return False
return self.__is_success_ldap_login(self.ldap_config.domain_name, self.ldap_config.password,
self.ldap_config.username)
Disconnection.
def disconnect(self) -> None:
self.connection.unbind()
The class constructor looks like this:
class LdapConnection:
def __init__(self, ldap_config: LdapConfig):
self.ldap_config = ldap_config
self.ldap_version = VERSION3
self.connection = ldap.initialize('ldaps://' + self.ldap_config.ip_address)
self.title = 'LdapConnection'
And the config that we pass as a parameter looks like this.
from _ldap import VERSION3
from source.core.ldap.network_utils import NetworkUtils
from source.utils.app_config.configs.app_config import AppConfig
from source.utils.console.console_utils import DumpLogger
from source.utils.network.network_helper import NetworkHelper
class LdapConfig(AppConfig):
def __init__(self, domain_name: str, username: str, password: str, ip_address: str, use_ldap_with_ssl: bool,
base_dn: str):
super().__init__()
self.domain_name = domain_name
self.username = username
self.password = password
self.ip_address = ip_address
self.use_ldap_with_ssl = use_ldap_with_ssl
self.base_dn = base_dn
self.smb_client_dialect = None
self.ldap_version = VERSION3
def print(self):
DumpLogger.print_title('LDAP configuration')
DumpLogger.print_param('domain name', self.domain_name)
DumpLogger.print_param('username', self.username)
DumpLogger.print_param('password', self.password)
DumpLogger.print_param('ip address', self.ip_address)
DumpLogger.print_param('base_dn', self.base_dn)
def is_valid(self) -> bool:
if not NetworkHelper.is_valid_ip_address(self.ip_address):
DumpLogger.print_error('LDAP config. Invalid ip address', self.ip_address)
return False
if NetworkUtils.get_base_dn(self.domain_name) is None:
DumpLogger.print_error('LDAP config. Invalid domain name', self.domain_name)
return False
return True
def help(self):
pass
At this point, the preliminary implementation
of the connection management module can be considered completed.
2.1.2 LDAP query executor – query execution module.
So, we have learned how to establish a connection and this, in itself, is already wonderful. Now we need to learn how to execute queries. To do this, we will need an active connection and a unique representation of the domain name inside LDAP, then I will call it
Base DN. As a result, we will have the following config:
class LdapQueryExecutorConfig(AppConfig):
def __init__(self, ldap_connection: LdapConnection, base_dn: str):
super().__init__()
self.ldap_connection = ldap_connection
self.base_dn = base_dn
def print(self):
print("LdapQueryExecutor configuration:")
self.ldap_connection.ldap_config.print()
pass
def is_valid(self):
return self.ldap_connection.ldap_config.is_valid()
def help(self):
pass
All requests are executed via LdapConnection and the task of ldapqueryexecutor is to pass the parameters to it and send the results further. Without further ado, just look at the implementation in Adenium and we will get something like the following.
class LdapQueryExecutor:
def __init__(self, config: LdapQueryExecutorConfig):
self.ldap_connector = config.ldap_connection
self.base_dn = config.base_dn
def search_server_ldap(self, object_to_search: str, attributes_to_search: list) -> list:
result_search = []
try:
result = self.ldap_connector.connection.search_s(self.base_dn, ldap.SCOPE_SUBTREE,
object_to_search,
attributes_to_search)
for info in result:
if info[0] is not None:
result_search.append([info[0], info[1]])
if len(result_search) == 0:
DumpLogger.highlight_warning("No entry found !")
except ldap.OPERATIONS_ERROR as error:
DumpLogger.print_error("OPERATIONS_ERROR: ", str(error))
raise error
except ldap.LDAPError as error:
DumpLogger.print_error("LDAPError: ", str(error))
raise error
return result_search
def search_server_ldap_pages(self, object_to_search: str, attributes_to_search: list) -> list | None:
page_control = SimplePagedResultsControl(True, size=1000, cookie='')
try:
response = self.ldap_connector.connection.search_ext(self.base_dn,
ldap.SCOPE_SUBTREE,
object_to_search,
attributes_to_search,
serverctrls=[page_control])
result = []
pages = 0
while True:
pages += 1
rtype, rdata, rmsgid, serverctrls = self.ldap_connector.connection.result3(response)
result.extend(rdata)
controls = [control for control in serverctrls
if control.controlType == SimplePagedResultsControl.controlType]
if not controls:
print('The server ignores RFC 2696 control')
break
if not controls[0].cookie:
break
page_control.cookie = controls[0].cookie
response = self.ldap_connector.connection.search_ext(self.base_dn,
ldap.SCOPE_SUBTREE,
object_to_search,
attributes_to_search,
serverctrls=[page_control])
result.append(response)
return result
except Exception as err:
DumpLogger.print_error('search_server_ldap_pages', str(err))
raise err
2.1.3 LDAP Data Collector
And finally, the most delicious thing in this section is the module for collecting and processing results.
As usual, let’s start with the config. Here it is extremely simple.
class LdapDataCollectorConfig(AppConfig):
def __init__(self, ldap_query_executor: LdapQueryExecutor):
super().__init__()
self.ldap_query_executor = ldap_query_executor
def print(self):
DumpLogger.print_title('LdapDataCollector configuration ')
self.ldap_query_executor.ldap_connector.ldap_config.print()
def is_valid(self):
return self.ldap_query_executor.ldap_connector.ldap_config.is_valid()
def help(self):
pass
An observant reader will immediately notice that we have a matryoshka doll made of configs here and ask the question: “what the fuck?”. I’ll answer simply. Order beats class. All the details of configs, builders and others like them are in the corresponding section.
In the meantime, let’s look at the basic functionality of this class. Its constructor looks like this:
class LdapDataCollector:
def __init__(self, config: LdapDataCollectorConfig):
self.query_executor = config.ldap_query_executor
self.domain_users = list()
self.domain_admins = list()
self.enterprise_admins = list()
self.domain_controllers = list()
self.domain_trusts = list()
self.servers = list()
self.user_pc = list()
self.os_versions = set()
self.server_os_count = 0
self.user_os_count = 0
self.os_counter = defaultdict(list)
self.user_groups = defaultdict(list)
self.computers = dict()
self.ad_organizational_units = list()
self.ad_subnets = list()
self.ad_groups = list()
In fact, it is a kind of container that accumulates the results of all queries. below is a list of open methods and the implementation of some of them. I will leave the implementation of the rest to the curious. There is nothing complicated there.
def get_domain_admins(self) -> list:
DumpLogger.print_title('get_domain_admins')
object_to_search = '(&(objectCategory=user)(adminCount=1))'
result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search=["*"])
for info in result:
if not self.__is_valid_data(info):
continue
res, name, sAMAccountName = self.__get_full_user_information(info)
if not self.__is_valid_query_result(name, res):
continue
self.domain_admins.append(res)
DumpLogger.print_success('Done...')
return self.domain_admins
def get_enterprise_admins(self) -> list:
# your code here
return self.enterprise_admins
def get_user_groups(self, username: str = '') -> list:
DumpLogger.print_title('get_user_groups')
# your code here
DumpLogger.print_success('Done...')
return self.user_groups[username]
def get_domain_controllers(self) -> list:
DumpLogger.print_title('get_domain_controllers')
object_to_search = '(&(objectCategory=computer)(userAccountControl:1.2.840.113556.1.4.803:=8192))'
attributes_to_search = ["dNSHostName", "operatingSystem", "operatingSystemVersion"]
result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search)
self.domain_controllers = self.__get_computers_info(result, is_os_version_needed=True)
DumpLogger.print_success('Done...')
return self.domain_controllers
def get_domain_trusts(self) -> list:
DumpLogger.print_title('get_domain_trusts')
# your ode here
DumpLogger.print_success('Done...')
return self.domain_trusts
def get_domain_computers_full_info(self) -> None:
DumpLogger.print_title('get domain computers full info')
object_to_search = '(&(objectCategory=computer))'
attributes_to_search = ["dNSHostName", "operatingSystem", "operatingSystemVersion"]
result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search)
try:
for info in result:
if not self.__is_valid_data(info):
continue
try:
self.__get_computer_full_info(info)
except Exception as err:
DumpLogger.print_error_message(str(err))
except Exception as err:
DumpLogger.print_error_message(str(err))
DumpLogger.print_success('Done...')
def get_domain_users(self) -> list:
DumpLogger.print_title('get domain users')
# your code here
DumpLogger.print_success('Done...')
return self.domain_users
def get_ad_organizational_unit(self) -> list:
DumpLogger.print_title('get AD organizational units')
object_to_search = '(&(objectcategory=organizationalUnit))'
attributes_to_search = ['*']
result = self.query_executor.search_server_ldap_pages(object_to_search, attributes_to_search)
for info in result:
if not self.__is_valid_data(info):
continue
self.ad_organizational_units.append(info[0])
DumpLogger.print_success('Done...')
return self.ad_organizational_units
def get_ad_subnets(self) -> list:
DumpLogger.print_title('get domain subnets')
# your code here
DumpLogger.print_success('Done...')
return self.ad_subnets
def get_ad_groups(self) -> list:
DumpLogger.print_title('get AD groups')
# your code here
return self.ad_groups
2.2 Conclusions.
We have divided the interaction with LDAP into several modules. Each of which represents an independent unit. In addition, it was possible to encapsulate all the “magic” of queries. For comparison, in the same ADFind, to execute a query, you need to remember or
constantly keep magic strings at hand, for example:
adfind.exe -f "(objectcategory=person)" > ad_users.txt
adfind.exe -f "objectcategory=computer" > ad_computers.txt
adfind.exe -f "(objectcategory=organizationalUnit)" > ad_ous.txt
adfind.exe -sc trustdmp > ad_trusts.txt
adfind.exe -subnets -f (objectCategory=subnet)>ad_subnets.txt
adfind.exe -f "(objectcategory=group)" > ad_groups.txt
adfind.exe -gcb -sc trustdmp > trustdmp.txt
In our implementation, everything looks a little simpler.
query_executor = LdapQueryExecutor(query_executor_config)
data_collector_config = ConfigFactory.create_data_collector_config(query_executor)
data_collector = LdapDataCollector(data_collector_config)
domain_admins = data_collector.get_domain_admins()
subnets = data_collector.get_ad_subnets()
trusts = data_collector.get_domain_trusts()
3 Modes of operation of the application
Now let’s talk about how to properly organize the operation of the application. That is, to make it as easy to use as possible, convenient for expanding functionality.
Go.
In fact, we want to remove network dumps that include various sets of parameters. Sometimes you need the most complete information about the network, including lists of users, hosts, subnets, groups, and much more. You may also need a mini-
a small data set that includes only the number of hosts, users, and a list of groups whose member is the current user on whose behalf requests are being made.
I will consider two types of dumps, there are several more in the project, but we are studying architecture, not the project.
3.1 Dump
In this mode, we collect the necessary minimum of information about the network, such as:
- a list of users;
- list of domain admins;
- list of Corporate admins;
- list of domain controllers;
- list of trusts;
- list of servers;
- list of workstations;
- statistics on operating systems;
3.2 FullDump
in this mode, we want to collect all available information about the network, adding the initial dump:
- a list of groups;
- divisions (OU, organization units);
- subnets;
- groups of which the current user is a member, under whom we are making requests;
- total number of hosts;
- the total number of users.
3.3 Architecture design.
Okay, now we have two modes of operation. We can jam them relatively painlessly and forget them. But we are writing an enterprise, so we just need to lay down the possibility of expansion.
Let’s talk out loud. What we have. We have a module that will collect all the information and give it as a list of collections. Only by itself it is not an end product. The final product should be a kind of completed result, with which some manipulations can be carried out. For example, display it on the screen, save it to a file, write it to the database. Voot. We have already drawn the framework of the AbstractProduct class and its abstract
methods. Below is its code.
class AbstractProduct:
def __init__(self):
self.is_valid = True
pass
@abstractmethod
def print_results(self):
pass
@abstractmethod
def save(self, app_config: AppConfig):
pass
Please note that we have a config again. And not just any specific one, but a basic class. In the Orthodox pluses, I would say that we are passing a pointer to the base class. Long live polymorphism. Now I do not know what kind of product I will have, what parameters will be passed to it in the save method, but it is not particularly important to me. The config knows this, and it will tell us what to do with the result.
Let’s move on. If you look at the immortal work of the Gang of Four, you can understand that we clearly already have a glimpse of the “Builder” pattern. Let’s remember what it is by referring to the original source.
A builder is a generative design pattern that allows you to create complex objects step by step. The builder makes it possible to use the same construction code to get different representations of objects. (The definition is taken from the book by the wonderful author Alexander Shvets.
And so. The builder pattern allows you to build the product in the right way. We have an LdapDataCollector class that can collect all the necessary information. But for each product, we need a different set of this data. The most near-
some comparison is the configuration of cars. We can purchase a bum kit on a stick with paddles. Or we can pay extra and get high from the maximum configuration. It is based on the same car, but the filling is different. I will not duplicate the text of the book here. It’s easy to Google, but if you really need it, I can share it. I’m too lazy to draw class diagrams, the same Shvets describes everything in as much detail as possible.
Let’s go back to our sheep (dumps).
3.3.1 Products
The products that the builder produces will be a regular dump and a full one. Below is the code of their constructors and the implementation of abstract methods of the AbstractProduct base class.
class Dump(AbstractProduct):
def __init__(self):
super().__init__()
self.domain_users = list()
self.domain_admins = list()
self.enterprise_admins = list()
self.domain_controllers = list()
self.domain_trusts = list()
self.servers = list()
self.user_pc = list()
self.os_versions = set()
self.server_os_count = 0
self.user_os_count = 0
self.os_counter = dict()
self.computers = dict()
def print_results(self):
DumpLogger.print_title('Dump. print_results')
self.print_domain_admins()
self.print_enterprise_admins()
self.print_domain_controllers()
self.print_domain_computers()
def save(self, app_config: DumpConfig):
DumpLogger.print_title(f'Dump saving results...')
self.save_domain_users(app_config)
self.save_domain_admins(app_config)
self.save_enterprise_admins(app_config)
self.save_servers(app_config)
self.save_users_pc(app_config)
self.save_os_statistic(app_config)
DumpLogger.print_success('Done...')
Full Dump
Since FullDump is an extended version of a regular dump, we will simply inherit it from it. As a result, you will get something like the following.
class Fulldump(Dump):
def __init__(self):
super().__init__()
self.ad_organizational_units = list()
self.ad_subnets = list()
self.ad_groups = list()
self.user_groups = list()
self.users_count = 0
self.computers_count = 0
def print_results(self):
super().print_results()
self.print_domain_groups()
self.print_domain_subnets()
self.print_organizational_units()
# DumpLogger.print_param('found', self.)
def save(self, app_config: FulldumpConfig):
super().save(app_config)
self.save_organizational_unit(app_config)
self.save_ad_groups(app_config)
self.save_subnets(app_config)
title = "Fastdump"
filename = app_config.fast_dump_filename
FileHelper.append_title_to_file(filename, title)
FileHelper.append_to_file(filename, f'found {self.users_count} users; ')
FileHelper.append_to_file(filename, f'found {self.computers_count} computers; ')
FileHelper.append_to_file(filename, f'current user is member of ')
FileHelper.save_list_to_file(self.user_groups, filename, 'user groups')
3.3.2 Builders
Now we need builders who will produce our products. We will also need a factory that, based on the passed parameters, will assemble the builders themselves. Let’s start with her, perhaps. At the heart of each builder we will have our immutable Ldap Data Collector. Here we will need a set of parameters for each product. We pass them in the form of a config. It will turn out something like the following:
class BuilderFactory:
def __init__(self):
pass
@staticmethod
def create_fulldump_builder(app_config: FulldumpConfig, data_collector: LdapDataCollector) -> AbstractBuilder:
return FulldumpBuilder(app_config, data_collector)
@staticmethod
def create_minidump_builder(app_config: MinidumpConfig, data_collector: LdapDataCollector) -> AbstractBuilder:
return MinidumpBuilder(app_config, data_collector)
The factory has only two static methods, but as the product expands, it will acquire them.
Now let’s look at the builders themselves. For convenience, let’s introduce another builder class and call it MiniDump. He will act as the collector of the minimum dump.
In total, we have 4 builder classes:
AbstractBuilder is the parent abstract class, without implementing any methods. We will give him the Ldap Data Collector as a constructor, which will be available to all his descendants. He has only two abstract methods. build_product and setup_incomplete_product.
trying to assemble the product. The second one eliminates the jambs of the first one and knocks up about errors.
class AbstractBuilder:
def __init__(self, data_collector: LdapDataCollector):
self.data_collector = data_collector
self.is_build_completed = True
self.error_message = ''
@abstractmethod
def build_product(self) -> AbstractProduct:
pass
@abstractmethod
def setup_incomplete_product(self, err, error_message):
pass
DumpBuilder is already a base class for dump builders. I don’t even know what other products I’ll be releasing yet, but it’s obvious that dumps need to be placed in a separate category. Its implementation came out like this. That is, we pass the product parameters to the input. We collect it inside closed methods and return it ready for further use of objects.
class DumpBuilder(AbstractBuilder):
def __init__(self, data_collector: LdapDataCollector, app_config: DumpConfig, mode: ProgramMode):
super().__init__(data_collector)
self.app_config = app_config
self.program_mode = mode
self._result = AbstractProduct()
@abstractmethod
def build_product(self) -> Dump():
pass
def setup_incomplete_product(self, err, error_message):
DumpLogger.print_error_message(self.error_message)
self.is_build_completed = False
def _is_data_collected(self) -> bool:
self.__find_domain_users()
self.__find_domain_admins()
self.__find_enterprise_admins()
self.__find_domain_controllers()
self.__find_domain_trusts()
self.__find_domain_computers()
if not self.is_build_completed:
DumpLogger.print_error('The data collecting for a dump mode failed with error', self.error_message)
return False
self._result.servers = self.data_collector.servers
self._result.user_pc = self.data_collector.user_pc
self._result.server_os_count = self.data_collector.server_os_count
self._result.user_os_count = self.data_collector.user_os_count
self._result.os_counter = self.data_collector.os_counter
self.is_build_completed = True
return True
MiniDumpBuilder – see the description above.
class MinidumpBuilder(DumpBuilder):
def __init__(self, app_config: MinidumpConfig, data_collector: LdapDataCollector):
super().__init__(data_collector, app_config, ProgramMode.MINI_DUMP)
self.app_config = app_config
self._result = Minidump()
def build_product(self) -> AbstractProduct | None:
if not self._is_data_collected():
DumpLogger.print_error('MinidumpBuilder error',
'failed to collect basic information about the network')
self.is_build_completed = False
return None
self.is_build_completed = True
DumpLogger.highlight_green('Done...')
return self._result
As you can easily see, the implementation took only a few lines. Let’s see what happens with the full dump builder.
class FulldumpBuilder(DumpBuilder):
def __init__(self, app_config: FulldumpConfig, data_collector: LdapDataCollector):
super().__init__(data_collector, app_config, ProgramMode.FULL_DUMP)
self.app_config = app_config
self._result = Fulldump()
def build_product(self) -> AbstractProduct | None:
DumpLogger.print_title('FULLDUMP BUILDER build product')
try:
if not self._is_data_collected():
DumpLogger.print_error('FulldumpBuilder error',
'failed to collect basic information about the network')
return None
if not self.__collect_fulldump_data():
self.error_message = 'FulldumpBuilder error. Failed to collect basic information about the network'
DumpLogger.print_error_message(self.error_message)
return None
self.is_build_completed = True
DumpLogger.highlight_green('Done...')
return self._result
except Exception as err:
DumpLogger.print_error('Error in building a full network dump', str(err))
raise err
def __collect_fulldump_data(self) -> bool:
self.__find_ad_ou()
self.__find_ad_subnets()
self.__find_ad_groups()
self._result.computers_count = len(self._result.servers) + len(self._result.user_pc)
self._result.users_count = len(self.data_collector.get_domain_users())
self._result.user_groups = self.data_collector.get_user_groups()
if not self.is_build_completed:
return False
self.is_build_completed = True
DumpLogger.highlight_green('Done...')
return True
def __find_ad_groups(self):
try:
self._result.ad_groups = self.data_collector.get_ad_groups()
except Exception as err:
self.error_message = f'get auth mechanism failed with error: {str(err)} '
self.setup_incomplete_product(err, self.error_message)
def __find_ad_subnets(self):
try:
self._result.ad_subnets = self.data_collector.get_ad_subnets()
except Exception as err:
self.error_message = f'get ad subnets failed with error: {str(err)} '
self.setup_incomplete_product(err, self.error_message)
def __find_ad_ou(self):
DumpLogger.print_title('FULLDUMP BUILDER __collect_fulldump_data')
try:
self._result.ad_organizational_units = self.data_collector.get_ad_organizational_unit()
except Exception as err:
self.error_message = f'get ad ou failed with error: {str(err)} '
self.setup_incomplete_product(err, self.error_message)
The code turned out to be compact and readable. Everything is according to Feng Shui.
4 Configs, configurators
Above, we repeatedly used the config class as parameters. It’s time to give him his due attention.
Application parameters can be passed either via the command line, if there are not very many of them. If we have a multicooker, then there can be a lot of parameters and then they should be put into a configuration file, which we will read, parse and work with its data. But this is also not here and not now. We will transmit it like ordinary mortals via the command line. At first I wanted to describe the argument parser as well, but the article has already come out quite voluminous. Let’s limit ourselves to the configs and configurators themselves.
Here’s the thing that happens. From the same parameters, we can assemble configs for different modes of operation of the application. If we have one or two modes, then you can just change the parameters and not warm your head. This is not our case, so we roll up our sleeves and let’s go.
4.1 Dump configs
Well, since we have 4 builders, then there will be as many configs for them. It’s different for everyone.
4.1.1 AppConfig
An abstract class with “purely virtual methods”, may the python adherents forgive me.
class AppConfig:
def __init__(self):
pass
@abstractmethod
def print(self):
pass
@abstractmethod
def is_valid(self) -> bool:
pass
@abstractmethod
def help(self):
pass
4.1.2 DumpConfig
Here we will already have quite specific parameters stored, such as:
- data storage directory for the current operating mode;
- catalog of saving sorted users;
- catalog of saving sorted workstations;
- the catalog of saving sorted servers;
- file names for saving lists of admins and other evil spirits. You will understand the code in general.
from source.utils.app_config.configs.app_config import AppConfig
from source.utils.console.console_utils import DumpLogger
class DumpConfig(AppConfig):
def __init__(self):
super().__init__()
self.current_mode_out_dir = ''
self.sorted_users_dir = ''
self.user_pc_filename = ''
self.server_os_filename = ''
self.domain_users_filename = ''
self.enterprise_admins_filename = ''
self.domain_admins_filename = ''
self.sorted_computers_dir = ''
def print(self):
DumpLogger.print_param('current mode out dir', self.current_mode_out_dir)
DumpLogger.print_param('sorted users dir', self.sorted_users_dir)
DumpLogger.print_param('users PC file', self.user_pc_filename)
DumpLogger.print_param('server OS file', self.server_os_filename)
DumpLogger.print_param('domain users file', self.domain_users_filename)
DumpLogger.print_param('enterprise admins file', self.enterprise_admins_filename)
DumpLogger.print_param('domain admins file', self.domain_admins_filename)
DumpLogger.print_param('sorted computers dir', self.sorted_computers_dir)
def is_valid(self) -> bool:
return super().is_valid()
def help(self):
super().help()
4.1.3 MiniDumpConfig
class MinidumpConfig(DumpConfig):
def __init__(self):
super().__init__()
self.domain_users_filename = 'domain_users.txt'
def print(self):
DumpLogger.print_title('Minidump configuration')
super().print()
DumpLogger.print_param('domain users file', self.domain_users_filename)
def is_valid(self) -> bool:
return True#todo: implement this
def help(self):
super().help()
DumpLogger.print_title('Minidump configuration. See README.md for more information')
4.1.4 FullDumpConfig
class FulldumpConfig(DumpConfig):
def __init__(self):
super().__init__()
self.subnets_filename = ''
self.groups_filename = ''
self.organizational_unit_filename = ''
self.fast_dump_filename = ''
def print(self):
DumpLogger.print_title('Fulldump configuration')
super().print()
DumpLogger.print_param('subnets file', self.subnets_filename)
DumpLogger.print_param('domain groups file', self.groups_filename)
DumpLogger.print_param('domain groups file', self.organizational_unit_filename)
def is_valid(self) -> bool:
return super().is_valid()
def help(self):
super().help()
4.2 Configurators
As you may have noticed, the values are not set in the configs themselves. You can, of course, pass everything through parameters, but this is the last century. Therefore, we will compose a goat on a lisapede – that is, configurators.
Go.
The main task of the configurator is to pass the correct parameters to the config. Which is what we’re going to do.
We act on the same principle. You give each config a configurator.
4.2.1 Apple Configurator
An abstract base class that accepts some general parameters as input and an empty config that it will fill in with these parameters.
class AppConfigurator:
def __init__(self, domain_name: str, current_mode_name, out_dir: str = 'evil-corp'):
self.out_dir = out_dir
self._root_out_dir = ''
self._current_mode_out_dir = ''
self._domain_name = domain_name
self._current_mode_name = current_mode_name
@abstractmethod
def setup(self):
pass
@abstractmethod
def create_out_dirs(self):
self.create_root_dir()
self.create_current_mode_out_dir()
def create_root_dir(self):
if not os.path.exists(self.out_dir):
os.mkdir(self.out_dir)
def create_current_mode_out_dir(self):
corp_dir = self._domain_name.replace('.', '_')
tmp = os.path.join(self.out_dir, corp_dir)
if not os.path.exists(tmp):
os.mkdir(tmp)
self._current_mode_out_dir = os.path.join(tmp, self._current_mode_name)
if not os.path.exists(self._current_mode_out_dir):
os.mkdir(self._current_mode_out_dir)
4.2.2 DumpConfigurator
class DumpConfigurator(AppConfigurator):
def __init__(self, dump_config: DumpConfig, domain_name: str, current_mode: str, out_dir: str = 'evil-corp'):
super().__init__(domain_name, current_mode, out_dir)
self.config = dump_config
def setup(self):
super().setup()
self.create_out_dirs()
self.config.domain_admins_filename = os.path.join(self.config.sorted_users_dir, 'domain_admins.txt')
self.config.enterprise_admins_filename = os.path.join(self.config.sorted_users_dir, 'enterprise_admins.txt')
self.config.domain_users_filename = os.path.join(self.config.sorted_users_dir, 'domain_users.txt')
self.config.server_os_filename = os.path.join(self.config.sorted_computers_dir, 'servers.txt')
self.config.user_pc_filename = os.path.join(self.config.sorted_computers_dir, 'user_pc.txt')
def create_out_dirs(self):
super().create_out_dirs()
self.config.current_mode_out_dir = self._current_mode_out_dir
if not os.path.exists(self._current_mode_out_dir):
os.mkdir(self._current_mode_out_dir)
self.config.sorted_users_dir = os.path.join(self._current_mode_out_dir, 'sorted_users')
if not os.path.exists(self.config.sorted_users_dir):
os.mkdir(self.config.sorted_users_dir)
self.config.sorted_computers_dir = os.path.join(self._current_mode_out_dir, 'sorted_computers')
if not os.path.exists(self.config.sorted_computers_dir):
os.mkdir(self.config.sorted_computers_dir)
4.2.3 MinidumpConfigurator
class MinidumpConfigurator(DumpConfigurator):
def __init__(self, minidump_config: MinidumpConfig, domain_name: str, out_dir: str = 'evil-corp'):
super().__init__(minidump_config, domain_name, 'minidump', out_dir)
def setup(self):
super().setup()
def create_out_dirs(self):
super().create_out_dirs()
4.2.4 FulldumpConfigurator
import os.path
from source.utils.app_config.configs.app_mode_configs.fulldump_config import FulldumpConfig
from source.utils.app_config.configurators.dump_configurator import DumpConfigurator
from source.utils.console.console_utils import DumpLogger
class FulldumpConfigurator(DumpConfigurator):
def __init__(self, fulldump_config: FulldumpConfig, domain_name: str, out_dir: str = 'evil-corp'):
super().__init__(fulldump_config, domain_name, 'fulldump', out_dir)
def setup(self):
super().setup()
self.config.organizational_unit_filename = \
os.path.join(self.config.current_mode_out_dir, 'organizational_unit.txt')
self.config.subnets_filename = os.path.join(self.config.current_mode_out_dir, 'subnets.txt')
self.config.groups_filename = os.path.join(self.config.current_mode_out_dir, 'groups.txt')
self.config.fast_dump_filename = os.path.join(self.config.current_mode_out_dir, 'fastdump.txt')
def create_out_dirs(self):
super().create_out_dirs()
DumpLogger.print_title('FulldumpConfigurator create_out_dirs')
pass
4.3 Conclusions
All our work now comes down to creating a config, transferring it to the builder, assembling the product and printing the results. and the final code, to launch one of the modes, we will
have approximately the following.
def run_fulldump_mode(self, fulldump_config, data_collector) -> bool:
try:
DumpLogger.print_title('run_fulldump_mode started')
fulldump_configurator = \
ConfiguratorsFactory.create_fulldump_configurator(fulldump_config, self.domain_name, self.out_dir)
fulldump_configurator.setup()
fulldump_builder = BuilderFactory.create_fulldump_builder(fulldump_config, data_collector)
fulldump = fulldump_builder.build_product()
self.result = fulldump
self.result.print_results()
self.result.save(fulldump_config)
return True
except Exception as error:
DumpLogger.print_error('Oh, sorry, something broke, but we\'re already working on it', str(error))
return False
This article is intended primarily for novice developers. But perhaps more experienced colleagues will be able to learn something for themselves or point out the shortcomings.
THE NOTE This article is for informational purposes only. We do not encourage you to commit any hacking. Everything you do is your responsibility.
TOX : 340EF1DCEEC5B395B9B45963F945C00238ADDEAC87C117F64F46206911474C61981D96420B72
Telegram : @DevSecAS
You might also like
More from Uncategorized
Fortinet FortiOS / FortiProxy Unauthorized RCE
CVE-2024-21762 is a buffer overflow write vulnerability in Fortinet Fortigate and FortiProxy. This vulnerability allows an unauthorized attacker to execute …
Cobalt Strike write own [Aggressor Script]
Introduction Cobalt Strike is a professional tool for conducting penetration testing and simulating the actions of attackers (adversary simulation).The main feature …
Bypass WD SmartScreen
WD SmartScreen bypass + MOTW evasion + Edge/Chrome 0 alerts (W10/W11) Today, I want to share something very easy and helpful. …