# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
"""
OpenSearch module to manage creation of new OpenSearch domains
"""
import json
import re
from compose_x_common.compose_x_common import keyisset, keypresent
from troposphere import (
AWS_ACCOUNT_ID,
AWS_NO_VALUE,
AWS_PARTITION,
AWS_REGION,
AWS_URL_SUFFIX,
GetAtt,
Ref,
Sub,
Tags,
opensearchservice,
)
from troposphere.ec2 import SecurityGroup
from troposphere.iam import Role
from troposphere.logs import LogGroup, ResourcePolicy
from ecs_composex.common import NONALPHANUM
from ecs_composex.common.cfn_conditions import define_stack_name
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import add_outputs, add_parameters
from ecs_composex.compose.compose_services.service_logging.helpers import (
get_closest_valid_log_retention_period,
)
from ecs_composex.iam import define_iam_policy
from ecs_composex.opensearch.opensearch_params import OS_DOMAIN_PORT, OS_DOMAIN_SG
from ecs_composex.resources_import import import_record_properties
from ecs_composex.secrets import add_db_secret
from ecs_composex.vpc.vpc_params import STORAGE_SUBNETS, VPC_ID
[docs]def validate_security_groups(domain, groups):
valid = True
for group in groups:
if not isinstance(group, str):
valid = False
LOG.error(f"{domain.name} - Group {group} is not of type <str>")
break
elif isinstance(group, str) and not re.match(r"sg-[a-z0-9]+", group):
valid = False
LOG.error(
f"{domain.name} - Group {group} is not valid as pert (sg-[a-z0-9]+)"
)
break
if not valid:
raise ValueError(
f"{domain.name} has SecurityGroupIds set but are not valid.", groups
)
[docs]def define_domain_security_group(domain, stack):
"""
Create a new Security Group for the Domain
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param ecs_composex.common.stacks.ComposeXStack stack:
:return: The security Group
"""
add_parameters(stack.stack_template, [VPC_ID])
sg = SecurityGroup(
f"{domain.logical_name}VPCSecurityGroup",
GroupDescription=Sub(
f"{domain.logical_name} OpenSearch SG in ${{STACK_NAME}}",
STACK_NAME=define_stack_name(stack.stack_template),
),
VpcId=Ref(VPC_ID),
Tags=Tags(OsDomainName=domain.name),
)
stack.stack_template.add_resource(sg)
return sg
[docs]def add_new_security_group(domain, properties, stack):
"""
Function to create a new Security Group
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param dict properties:
:param ecs_composex.common.stacks.ComposeXStack stack:
"""
if keyisset("VPCOptions", properties) and keyisset(
"SecurityGroupIds", properties["VPCOptions"]
):
groups = properties["VPCOptions"]["SecurityGroupIds"]
validate_security_groups(domain, groups)
LOG.warn(
f"{domain.name} already has SecurityGroupIds set. Cannot verify its validity"
)
LOG.info(
f"{domain.name} has already SecurityGroupIds set. "
"Adding a new one for the purpose of Compose-X Automation"
)
domain.security_group = define_domain_security_group(domain, stack)
properties["VPCOptions"]["SecurityGroupIds"].append(Ref(domain.security_group))
elif (
keyisset("VPCOptions", properties)
and not keyisset("SecurityGroupIds", properties["VPCOptions"])
) or (domain.settings and keyisset("Subnets", domain.settings)):
domain.security_group = define_domain_security_group(domain, stack)
vpc_options = {
"SecurityGroupIds": [Ref(domain.security_group)],
"SubnetIds": (
Ref(domain.subnets_override)
if domain.subnets_override
else Ref(STORAGE_SUBNETS)
),
}
properties["VPCOptions"] = opensearchservice.VPCOptions(**vpc_options)
[docs]def create_log_groups(domain, stack, props):
"""
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param ecs_composex.common.stacks.ComposeXStack stack:
:param dict props:
:return:
"""
opts = {}
all_opts = [
"SEARCH_SLOW_LOGS",
"ES_APPLICATION_LOGS",
"INDEX_SLOW_LOGS",
"AUDIT_LOGS",
]
opts_to_add = (
domain.parameters["CreateLogGroups"]
if isinstance(domain.parameters["CreateLogGroups"], list)
else all_opts
)
groups = []
for option in opts_to_add:
group_name = Sub(
f"opensearch/${{STACK_NAME}}/{domain.logical_name}/{option}",
STACK_NAME=define_stack_name(stack.stack_template),
)
log_group = LogGroup(
f"{domain.logical_name}{NONALPHANUM.sub('', option)}LogGroup",
LogGroupName=group_name,
RetentionInDays=(
30
if not keyisset("RetentionInDays", domain.parameters)
else get_closest_valid_log_retention_period(
domain.parameters["RetentionInDays"]
)
),
)
stack.stack_template.add_resource(log_group)
groups.append(log_group)
opts[option] = {
"Enabled": True,
"CloudWatchLogsLogGroupArn": Sub(
f"arn:${{{AWS_PARTITION}}}:logs:${{{AWS_REGION}}}:${{{AWS_ACCOUNT_ID}}}:"
f"log-group:${{{log_group.title}}}"
),
}
if keyisset("CreateLogGroupsResourcePolicy", domain.parameters):
logs_policy = ResourcePolicy(
"OpenSearchLogGroupResourcePolicy",
DeletionPolicy="Retain",
PolicyName="ComposeXOpenSearchAccessToCWLogs",
PolicyDocument=Sub(
json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowESDomainsToAccessLogGroupsInAllRegions",
"Effect": "Allow",
"Principal": {"Service": f"es.${{{AWS_URL_SUFFIX}}}"},
"Action": ["logs:PutLogEvents", "logs:CreateLogStream"],
"Resource": [
f"arn:${{{AWS_PARTITION}}}:logs:*:${{{AWS_ACCOUNT_ID}}}:log-group:opensearch/*"
],
}
],
}
)
),
)
stack.stack_template.add_resource(logs_policy)
props["LogPublishingOptions"] = opts
[docs]def correcting_required_settings(domain, props):
"""
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param dict props:
:return:
"""
if not keyisset("NodeToNodeEncryptionOptions", props):
props["NodeToNodeEncryptionOptions"] = (
opensearchservice.NodeToNodeEncryptionOptions(Enabled=True)
)
elif (
keypresent("NodeToNodeEncryptionOptions", domain.parameters)
and not domain.parameters["NodeToNodeEncryptionOptions"]
):
LOG.warn(
"You have Advanced Security options enabled but NodeToNodeEncryptionOptions is disabled. Enabling"
)
props["NodeToNodeEncryptionOptions"] = (
opensearchservice.NodeToNodeEncryptionOptions(Enabled=True)
)
if keyisset("EncryptionAtRestOptions", props):
crypt_options = props["EncryptionAtRestOptions"]
if hasattr(crypt_options, "Enabled") and crypt_options.Enabled is False:
LOG.warn(
f"{domain.name} - With Advanced Security options, Encryption at rest must be enabled. Enabling"
)
setattr(crypt_options, "Enabled", True)
else:
props["EncryptionAtRestOptions"] = opensearchservice.EncryptionAtRestOptions(
Enabled=True
)
if keyisset("DomainEndpointOptions", props):
settings = props["DomainEndpointOptions"]
setattr(settings, "EnforceHTTPS", True)
else:
props["DomainEndpointOptions"] = opensearchservice.DomainEndpointOptions(
EnforceHTTPS=True,
)
[docs]def generate_master_user(domain, stack, props):
"""
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param ecs_composex.common.stacks.ComposeXStack stack:
:param dict props:
:return:
"""
master_user_opts = {}
iam_role = None
if keyisset("GenerateMasterUserSecret", domain.parameters):
domain.db_secret = add_db_secret(stack.stack_template, domain.logical_name)
master_user_opts["MasterUserName"] = Sub(
f"{{{{resolve:secretsmanager:${{{domain.db_secret.title}}}:SecretString:username}}}}"
)
master_user_opts["MasterUserPassword"] = Sub(
f"{{{{resolve:secretsmanager:${{{domain.db_secret.title}}}:SecretString:password}}}}"
)
LOG.info(f"{domain.name} - Created Secret for MasterUser")
if keyisset("CreateMasterUserRole", domain.parameters):
LOG.info(f"{domain.name} - adding MasterUserARN")
statement = {
"Effect": "Allow",
"Principal": {
"AWS": [
Sub(f"arn:${{{AWS_PARTITION}}}:iam::${{{AWS_ACCOUNT_ID}}}:root")
]
},
"Action": ["sts:AssumeRole"],
"Condition": {"Bool": {"aws:SecureTransport": "true"}},
}
policy_doc = {"Version": "2012-10-17", "Statement": [statement]}
iam_role = Role(
f"{domain.logical_name}MasterUserRole",
PermissionsBoundary=(
define_iam_policy(
domain.parameters["MasterUserRolePermissionsBoundary"]
)
if keyisset("MasterUserRolePermissionsBoundary", domain.parameters)
else Ref(AWS_NO_VALUE)
),
AssumeRolePolicyDocument=policy_doc,
)
stack.stack_template.add_resource(iam_role)
master_user_opts["MasterUserARN"] = GetAtt(iam_role, "Arn")
if not master_user_opts:
return
if not keyisset("AdvancedSecurityOptions", props):
security_opts = opensearchservice.AdvancedSecurityOptionsInput(
Enabled=True,
InternalUserDatabaseEnabled=False if not domain.db_secret else True,
MasterUserOptions=opensearchservice.MasterUserOptions(**master_user_opts),
)
props["AdvancedSecurityOptions"] = security_opts
else:
LOG.warn(
f"{domain.name}.Properties.AdvancedSecurityOptions is set as well as MacroParameters. Overriding"
)
security_opts = props["AdvancedSecurityOptions"]
if security_opts.InternalUserDatabaseEnabled is True and iam_role:
LOG.warn(
f"{domain.name} - When using CreateMasterUserRole, you cannot use the Internal DB"
)
setattr(security_opts, "InternalUserDatabaseEnabled", False)
elif security_opts.InternalUserDatabaseEnabled is False and domain.db_secret:
LOG.warn(
f"{domain.name} - When using CreateMasterUserRole, you must use the Internal DB"
)
setattr(security_opts, "InternalUserDatabaseEnabled", True)
setattr(
security_opts,
"MasterUserOptions",
opensearchservice.MasterUserOptions(**master_user_opts),
)
setattr(security_opts, "Enabled", True)
[docs]def apply_domain_parameters(domain, stack, props):
"""
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param ecs_composex.common.stacks.ComposeXStack stack:
:param dict props:
"""
if keyisset("CreateLogGroups", domain.parameters):
create_log_groups(domain, stack, props)
if keyisset("CreateMasterUserRole", domain.parameters) and keyisset(
"GenerateMasterUserSecret", domain.parameters
):
raise ValueError(
"You cannot have both a MasterRole and MasterUser at the same time."
)
if keyisset("CreateMasterUserRole", domain.parameters) or keyisset(
"GenerateMasterUserSecret", domain.parameters
):
if (
keyisset("AdvancedSecurityOptions", props)
and props["AdvancedSecurityOptions"].InternalUserDatabaseEnabled is True
):
LOG.error(
f"{domain.name} - You have defined InternalUserDatabaseEnabled to True. MasterUser cannot be used"
)
else:
generate_master_user(domain, stack, props)
[docs]def validate_instance_types_config(domain, props, instance_type, config):
"""
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param dict props:
:param str instance_type:
:param dict config:
:raises: ValueError if features are not compatible withe the instance type
"""
must_be_null = ["EBSOptions"]
if keyisset("not_supported", config):
unsupported = config["not_supported"]
for top_config, false_prop in unsupported.items():
if keyisset(top_config, props) and hasattr(props[top_config], false_prop):
value = getattr(props[top_config], false_prop)
if value is not False:
raise ValueError(
f"{domain.name} - Property {top_config}.{false_prop} is enabled, but is "
f"incompatible with {instance_type} instances type"
)
elif value is False and top_config in must_be_null:
LOG.warn(
f"{domain.name} - {top_config}.{false_prop} is False but the property must be null. "
"Overriding to AWS::NoValue"
)
props[top_config] = Ref(AWS_NO_VALUE)
if keyisset("must_have", config):
must_have = config["must_have"]
for top_config, req_prop in must_have.items():
if keyisset(top_config, props) and hasattr(props[top_config], req_prop):
value = getattr(props[top_config], req_prop)
if value is not True:
raise ValueError(
f"{domain.name} - {instance_type} requires {top_config}.{req_prop} to be True"
)
[docs]def validate_version_support(domain, props, instance_type, config):
"""
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param dict props:
:param str instance_type:
:param dict config:
:raises: ValueError if features are not compatible withe the instance type
"""
if not keyisset("EngineVersion", props):
return
if not keyisset("EngineVersion", config):
return
engine_version = props["EngineVersion"]
engine_support = config["EngineVersion"]
version_re = re.compile(
r"(?P<engine>OpenSearch|Elasticsearch)_(?P<version>\d+.\d+)$"
)
if not version_re.match(engine_version):
raise ValueError(
f"{domain.name} - EngineVersion {engine_version} is not valid. Must match",
version_re.pattern,
)
version_number = float(version_re.match(engine_version).group("version"))
engine_name = version_re.match(engine_version).group("engine")
if not keyisset(engine_name, engine_support):
return
supported_version = float(engine_support[engine_name])
if version_number < supported_version:
raise ValueError(
f"{domain.name} - EngineVersion {engine_version} is not supported. "
f"{instance_type} Requires >={engine_name}_{supported_version}"
)
[docs]def validate_no_architecture_mix(domain, types):
"""
Function to ensure there is no Graviton instances mixed with non-graviton ones
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param list[str] types:
:raises: ValueError if not all instances are of the same architecture
"""
graviton_re = re.compile(r"[a-z]\d(g$|gd$)")
types_are_graviton = [bool(graviton_re.match(i_type)) for i_type in types]
if not all(x == types_are_graviton[0] for x in types_are_graviton):
raise ValueError(
f"{domain.name} - Not all instances are of the same architecture", types
)
[docs]def correct_properties(domain, props):
"""
Function to rectify settings in case invalid options were set with each other.
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param dict props:
"""
if (
keyisset("EBSOptions", props)
and hasattr(props["EBSOptions"], "EBSEnabled")
and props["EBSOptions"].EBSEnabled is False
):
props["EBSOptions"] = Ref(AWS_NO_VALUE)
[docs]def validate_instance_types(domain, props):
"""
Validates that the settings set are compatible with one another
:param ecs_composex.opensearch.opensearch_stack.OpenSearchDomain domain:
:param dict props:
"""
instance_types = {
"c4": {},
"c5": {"EngineVersion": {"Elasticsearch": 5.1, "OpenSearch": 1.0}},
"c6g": {
"EngineVersion": {"Elasticsearch": 7.9, "OpenSearch": 1.0},
"must_have": {"EBSOptions": "EBSEnabled"},
},
"i2": {},
"i3": {
"EngineVersion": {
"Elasticsearch": 5.1,
"OpenSearch": 1.0,
},
"not_supported": {"EBSOptions": "EBSEnabled"},
},
"m3": {
"EngineVersion": {"Elasticsearch": 6.5, "OpenSearch": 1.0},
"not_supported": {
"EncryptionAtRestOptions": "Enabled",
"AdvancedSecurityOptions": "Enabled",
},
},
"m4": {},
"m5": {"EngineVersion": {"Elasticsearch": 5.1, "OpenSearch": 1.0}},
"m6g": {
"EngineVersion": {
"Elasticsearch": 7.9,
"OpenSearch": 1.0,
},
"must_have": {"EBSOptions": "EBSEnabled"},
},
"r3": {
"EngineVersion": {"OpenSearch": 1.0, "Elasticsearch": 6.5},
"not_supported": {
"EncryptionAtRestOptions": "Enabled",
"AdvancedSecurityOptions": "Enabled",
},
},
"r4": {},
"r5": {"EngineVersion": {"OpenSearch": 1.0, "Elasticsearch": 5.1}},
"r6g": {
"EngineVersion": {
"OpenSearch": 1.0,
"Elasticsearch": 7.9,
},
"must_have": {"EBSOptions": "EBSEnabled"},
},
"r6gd": {
"EngineVersion": {"OpenSearch": 1.0, "Elasticsearch": 7.9},
"not_supported": {"EBSOptions": "EBSEnabled"},
},
"t2": {
"EngineVersion": {"OpenSearch": 1.0, "Elasticsearch": 6.5},
"not_supported": {
"EncryptionAtRestOptions": "Enabled",
"AdvancedSecurityOptions": "Enabled",
"ClusterConfig": "WarmEnabled",
},
},
"t3": {
"EngineVersion": {"OpenSearch": 1.0, "Elasticsearch": 6.5},
"not_supported": {
"EncryptionAtRestOptions": "Enabled",
"AdvancedSecurityOptions": "Enabled",
"ClusterConfig": "WarmEnabled",
},
},
}
if not keyisset("ClusterConfig", props):
return
instance_types_logged = []
cluster_type_props = ["DedicatedMasterType", "InstanceType", "WarmType"]
cluster_config = props["ClusterConfig"]
for cluster_type_prop in cluster_type_props:
if not hasattr(cluster_config, cluster_type_prop):
continue
defined_type = getattr(cluster_config, cluster_type_prop).split(".")[0]
if defined_type not in instance_types.keys():
raise ValueError(
f"{domain.name} - Instance Type for {cluster_type_prop} is not valid",
getattr(cluster_config, cluster_type_prop),
list(instance_types.keys()),
)
instance_types_logged.append(defined_type)
config = instance_types[defined_type]
validate_version_support(domain, props, defined_type, config)
validate_instance_types_config(domain, props, defined_type, config)
validate_no_architecture_mix(domain, instance_types_logged)
correct_properties(domain, props)
[docs]def create_new_domains(new_domains, stack):
"""
Function to create the new CFN Template for the OS Domains to create
:param list[ecs_composex.opensearch.opensearch_stack.OpenSearchDomain] new_domains:
:param ecs_composex.common.stacks.ComposeXStack stack:
"""
for domain in new_domains:
domain.set_override_subnets()
props = import_record_properties(domain.properties, opensearchservice.Domain)
if keyisset("VPCOptions", props) or domain.subnets_override:
add_new_security_group(domain, props, stack)
if domain.parameters:
apply_domain_parameters(domain, stack, props)
if keyisset("AdvancedSecurityOptions", props):
correcting_required_settings(domain, props)
validate_instance_types(domain, props)
domain.cfn_resource = opensearchservice.Domain(domain.logical_name, **props)
domain.init_outputs()
stack.stack_template.add_resource(domain.cfn_resource)
domain.generate_outputs()
if domain.security_group:
domain.add_new_output_attribute(
OS_DOMAIN_SG,
(
f"{domain.logical_name}{OS_DOMAIN_SG.return_value}",
domain.security_group,
GetAtt,
OS_DOMAIN_SG.return_value,
),
)
domain.add_new_output_attribute(
OS_DOMAIN_PORT,
(
f"{domain.logical_name}{OS_DOMAIN_PORT.title}",
OS_DOMAIN_PORT.Default,
OS_DOMAIN_PORT.Default,
False,
),
)
add_parameters(stack.stack_template, [OS_DOMAIN_PORT])
add_outputs(stack.stack_template, domain.outputs)