Source code for document.agent.catalog

from elasticsearch_dsl import Boolean, Nested, Text

from document.base import BaseDocument, BaseInnerDoc


class AgentCatalogActionConfigInnerDoc(BaseInnerDoc):
    """Agent action configuration."""

    cmd = Text(required=True)
    args = Text()
    daemon = Boolean()


class AgentCatalogActionInnerDoc(BaseInnerDoc):
    """Agent action."""

    id = Text(required=True)
    config = Nested(AgentCatalogActionConfigInnerDoc, required=True)
    status = Text()
    description = Text()
    example = Text()

    @staticmethod
    def from_agent_type(agent_type, container):
        cfg = agent_type.pop("config", {})
        _id = agent_type.pop("id", None)
        cmd = cfg.pop("cmd", None)
        daemon = agent_type.pop("daemon", None)
        args = agent_type.pop("args", None)
        obj = AgentCatalogActionInnerDoc.get_or_new(
            id=_id, container=container
        )
        for field, data in agent_type.items():
            setattr(obj, field, data)
        if cmd:
            obj.config = AgentCatalogActionConfigInnerDoc(
                cmd=cmd, daemon=daemon, args=args)
        return obj


class AgentCatalogParameterConfigInnerDoc(BaseInnerDoc):
    """Agent parameter configuration."""

    schema = Text(required=True)
    source = Text(required=True)
    path = Text(required=True)


class AgentCatalogParameterInnerDoc(BaseInnerDoc):
    """Agent parameter."""

    id = Text(required=True)
    # possible values: integer, number, time-duration,
    #                  string, choice, boolean, binary
    type = Text(required=True)
    config = Nested(AgentCatalogParameterConfigInnerDoc, required=True)
    list = Boolean()
    values = Text()  # when type = choice
    description = Text()
    example = Text()

    @staticmethod
    def from_agent_type(agent_type, container, schema, source):
        _id = agent_type.pop('id', None)
        path = agent_type.pop("path", None)
        schema = agent_type.pop("schema", schema)
        source = agent_type.pop("source", source)
        config = agent_type.pop("config", None)
        if config is not None:
            schema = config.pop("schema", schema)
            source = config.pop("source", source)
            path = config.pop("path", path)
        obj = AgentCatalogParameterInnerDoc.get_or_new(
            id=_id, container=container
        )
        for field, data in agent_type.items():
            setattr(obj, field, data)
        obj.config = AgentCatalogParameterConfigInnerDoc(
            path=path, schema=schema, source=source
        )
        return obj


class AgentCatalogResourceConfigInnerDoc(BaseInnerDoc):
    """Agent resource configuration."""

    path = Text(required=True)


class AgentCatalogResourceInnerDoc(BaseInnerDoc):
    """Agent resource."""

    id = Text(required=True)
    config = Nested(AgentCatalogResourceConfigInnerDoc, required=True)
    description = Text()
    example = Text()


[docs]class AgentCatalogDocument(BaseDocument): """Represents an agent in the catalog.""" # id already defined by Elasticsearch actions = Nested(AgentCatalogActionInnerDoc) parameters = Nested(AgentCatalogParameterInnerDoc) resources = Nested(AgentCatalogResourceInnerDoc) description = Text() deployment_source = Text()
[docs] class Index: """Elasticsearch configuration.""" name = "agent-catalog"
@staticmethod def from_agent_type(agent_type): _id = agent_type.pop("id", None) source = agent_type.pop("source", None) schema = agent_type.pop("schema", None) obj = AgentCatalogDocument.get_or_new(id=_id) for action in agent_type.pop("actions", []): AgentCatalogActionInnerDoc.from_agent_type( action, container=obj.actions ) obj.parameters = [] for param in agent_type.pop("parameters", []): AgentCatalogParameterInnerDoc.from_agent_type( param, container=obj.parameters, schema=schema, source=source ) for field, data in agent_type.items(): setattr(obj, field, data) obj.save()