Source code for reader.query

# Copyright (c) 2020-2022 GUARD
# author: Alex Carrega <alessandro.carrega@cnit.it>

from bunch import Bunch
from elasticsearch import RequestError
from elasticsearch_dsl import Q, Search
from falcon.errors import HTTPBadRequest

from utils.log import Log
from utils.sequence import is_dict, is_list

MSG_NOT_VALID_JSON = {
    "title": "Not valid JSON",
    "description": "The request body is not a valid JSON or it is not encoded as UTF-8.",  # noqa: E501
}
MSG_UNKNOWN = "{} unknown"
MSG_REQ_NOT_VALID = {
    "title": "Request not valid",
    "description": "Order with not valid/missing data",
}
MSG_CLAUSE_NOT_VALID = Bunch(
    title="Request not valid",
    description="{} clause with not valid/missing data",
)


[docs]class QueryReader: def __init__(self, index): self.search = Search(index=index) def parse(self, query, item_id=None): try: self.__select(query) self.search.query = self.__where(query, _id=item_id) self.__order(query) self.__limit(query) except RequestError as req_err: raise HTTPBadRequest(title=req_err.error, description=req_err.info) except HTTPBadRequest as http_bad_req: raise http_bad_req except Exception as exception: Log.get("query-reader").error(f"Exception: {exception}") raise HTTPBadRequest(**MSG_NOT_VALID_JSON) # noqa: E501 return self.search def __select(self, query): self.search = self.search.source(query.get("select", None)) def __where(self, query, _id=None): query_obj = None for operator, clause in query.get("where", {}).items(): if operator == "and": if is_dict(clause): for sub_op, sub_clause in clause.items(): if query_obj is None: query_obj = self.__where( {"where": {sub_op: sub_clause}} ) else: query_obj = query_obj & self.__where( {"where": {sub_op: sub_clause}} ) elif is_list(clause): for sub_clause in clause: if query_obj is None: query_obj = self.__where({"where": sub_clause}) else: query_obj = query_obj & self.__where( {"where": sub_clause} ) elif operator == "or": if is_dict(clause): for sub_op, sub_clause in clause.items(): if query_obj is None: query_obj = self.__where( {"where": {sub_op: sub_clause}} ) else: query_obj = query_obj | self.__where( {"where": {sub_op: sub_clause}} ) elif is_list(clause): for sub_clause in clause: if query_obj is None: query_obj = self.__where({"where": sub_clause}) else: query_obj = query_obj | self.__where( {"where": sub_clause} ) elif operator == "not": query_obj = ~self.__where(clause) else: prop = self.__fix_target(clause.get("target", None)) expr = clause.get("expr", None) if prop is None or expr is None: raise HTTPBadRequest( title=MSG_CLAUSE_NOT_VALID.title, description=MSG_CLAUSE_NOT_VALID.description.format( operator ), ) if operator == "equals": query_obj = Q("match_phrase", **{prop: expr}) elif operator == "reg_exp": query_obj = Q("regexp", **{prop: {"value": expr}}) elif operator == "wildcard": query_obj = Q("wildcard", **{prop: {"value": expr}}) elif operator in ["lt", "lte", "gt", "gte"]: query_obj = Q("range", **{prop: {operator: expr}}) else: raise HTTPBadRequest( title=MSG_UNKNOWN.format("Operation"), description=MSG_UNKNOWN.format(operator), ) # noqa: E501 if _id is not None: if query_obj is None: query_obj = Q("term", _id=_id) else: query_obj = query_obj & Q("term", _id=_id) return query_obj if query_obj is not None else Q() def __order(self, query): sort_list = [] for order in query.get("order", []): prop = self.__fix_target(order.get("target", None)) mode = order.get("mode", None) if prop is not None and mode is not None: sort_list.append(prop if mode == "asc" else f"-{prop}") else: raise HTTPBadRequest(**MSG_REQ_NOT_VALID) # noqa: E501 self.search = self.search.sort(*sort_list) def __limit(self, query): limit = query.get("limit", {}) start = limit.get("from", 0) end = limit.get("to", self.search.count() - 1) self.search = self.search[start : (end + 1)] # noqa: E203 @staticmethod def __fix_target(prop): return "_id" if prop == "id" else prop