LCOV - code coverage report
Current view: top level - source3/rpc_server/mdssvc - mdssvc_es.c (source / functions) Hit Total Coverage
Test: coverage report for master 98b443d9 Lines: 260 397 65.5 %
Date: 2024-05-31 13:13:24 Functions: 18 22 81.8 %

          Line data    Source code
       1             : /*
       2             :    Unix SMB/CIFS implementation.
       3             :    Main metadata server / Spotlight routines / ES backend
       4             : 
       5             :    Copyright (C) Ralph Boehme                   2019
       6             : 
       7             :    This program is free software; you can redistribute it and/or modify
       8             :    it under the terms of the GNU General Public License as published by
       9             :    the Free Software Foundation; either version 3 of the License, or
      10             :    (at your option) any later version.
      11             : 
      12             :    This program is distributed in the hope that it will be useful,
      13             :    but WITHOUT ANY WARRANTY; without even the implied warranty of
      14             :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      15             :    GNU General Public License for more details.
      16             : 
      17             :    You should have received a copy of the GNU General Public License
      18             :    along with this program.  If not, see <http://www.gnu.org/licenses/>.
      19             : */
      20             : 
      21             : #include "includes.h"
      22             : #include "system/filesys.h"
      23             : #include "lib/util/time_basic.h"
      24             : #include "lib/tls/tls.h"
      25             : #include "lib/util/tevent_ntstatus.h"
      26             : #include "libcli/http/http.h"
      27             : #include "lib/util/tevent_unix.h"
      28             : #include "credentials.h"
      29             : #include "mdssvc.h"
      30             : #include "mdssvc_es.h"
      31             : #include "rpc_server/mdssvc/es_parser.tab.h"
      32             : #include "lib/param/param.h"
      33             : 
      34             : #include <jansson.h>
      35             : 
      36             : #undef DBGC_CLASS
      37             : #define DBGC_CLASS DBGC_RPC_SRV
      38             : 
      39             : #define MDSSVC_ELASTIC_QUERY_TEMPLATE   \
      40             :         "{"                           \
      41             :         "    \"from\": %zu,"                \
      42             :         "    \"size\": %zu,"                \
      43             :         "    \"_source\": [%s],"    \
      44             :         "    \"query\": {"          \
      45             :         "        \"query_string\": {"       \
      46             :         "            \"query\": \"%s\"" \
      47             :         "        }"                   \
      48             :         "    }"                               \
      49             :         "}"
      50             : 
      51             : #define MDSSVC_ELASTIC_SOURCES \
      52             :         "\"path.real\""
      53             : 
      54           4 : static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx)
      55             : {
      56           4 :         struct mdssvc_es_ctx *mdssvc_es_ctx = NULL;
      57             :         json_error_t json_error;
      58           4 :         char *default_path = NULL;
      59           4 :         const char *path = NULL;
      60             : 
      61           4 :         mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx);
      62           4 :         if (mdssvc_es_ctx == NULL) {
      63           0 :                 return false;
      64             :         }
      65           4 :         mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx;
      66             : 
      67           4 :         mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx);
      68           4 :         if (mdssvc_es_ctx->creds == NULL) {
      69           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      70           0 :                 return false;
      71             :         }
      72             : 
      73           4 :         default_path = talloc_asprintf(
      74             :                 mdssvc_es_ctx,
      75             :                 "%s/mdssvc/elasticsearch_mappings.json",
      76             :                 get_dyn_SAMBA_DATADIR());
      77           4 :         if (default_path == NULL) {
      78           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      79           0 :                 return false;
      80             :         }
      81             : 
      82           4 :         path = lp_parm_const_string(GLOBAL_SECTION_SNUM,
      83             :                                     "elasticsearch",
      84             :                                     "mappings",
      85             :                                     default_path);
      86           4 :         if (path == NULL) {
      87           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      88           0 :                 return false;
      89             :         }
      90             : 
      91           4 :         mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error);
      92           4 :         if (mdssvc_es_ctx->mappings == NULL) {
      93           0 :                 DBG_ERR("Opening mapping file [%s] failed: %s\n",
      94             :                         path, json_error.text);
      95           0 :                 TALLOC_FREE(mdssvc_es_ctx);
      96           0 :                 return false;
      97             :         }
      98           4 :         TALLOC_FREE(default_path);
      99             : 
     100           4 :         mdssvc_ctx->backend_private = mdssvc_es_ctx;
     101           4 :         return true;
     102             : }
     103             : 
     104           4 : static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx)
     105             : {
     106           4 :         return true;
     107             : }
     108             : 
     109             : static struct tevent_req *mds_es_connect_send(
     110             :                                 TALLOC_CTX *mem_ctx,
     111             :                                 struct tevent_context *ev,
     112             :                                 struct mds_es_ctx *mds_es_ctx);
     113             : static int mds_es_connect_recv(struct tevent_req *req);
     114             : static void mds_es_connected(struct tevent_req *subreq);
     115             : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx);
     116             : static void mds_es_search_set_pending(struct sl_es_search *s);
     117             : static void mds_es_search_unset_pending(struct sl_es_search *s);
     118             : 
     119          12 : static int mds_es_ctx_destructor(struct mds_es_ctx *mds_es_ctx)
     120             : {
     121          12 :         struct sl_es_search *s = mds_es_ctx->searches;
     122             : 
     123             :         /*
     124             :          * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about
     125             :          * to go away and has already freed all waiting searches. If there's a
     126             :          * search remaining that's when the search is already active. Reset the
     127             :          * mds_es_ctx pointer, so we can detect this when the search completes.
     128             :          */
     129             : 
     130          12 :         if (s == NULL) {
     131          12 :                 return 0;
     132             :         }
     133             : 
     134           0 :         s->mds_es_ctx = NULL;
     135             : 
     136           0 :         return 0;
     137             : }
     138             : 
     139          12 : static bool mds_es_connect(struct mds_ctx *mds_ctx)
     140             : {
     141          12 :         struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort(
     142             :                 mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx);
     143          12 :         struct mds_es_ctx *mds_es_ctx = NULL;
     144          12 :         struct tevent_req *subreq = NULL;
     145             : 
     146          12 :         mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx);
     147          12 :         if (mds_es_ctx == NULL) {
     148           0 :                 return false;
     149             :         }
     150          12 :         *mds_es_ctx = (struct mds_es_ctx) {
     151             :                 .mdssvc_es_ctx = mdssvc_es_ctx,
     152             :                 .mds_ctx = mds_ctx,
     153             :         };
     154             : 
     155          12 :         mds_ctx->backend_private = mds_es_ctx;
     156          12 :         talloc_set_destructor(mds_es_ctx, mds_es_ctx_destructor);
     157             : 
     158          12 :         subreq = mds_es_connect_send(
     159             :                         mds_es_ctx,
     160          12 :                         mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     161             :                         mds_es_ctx);
     162          12 :         if (subreq == NULL) {
     163           0 :                 TALLOC_FREE(mds_es_ctx);
     164           0 :                 return false;
     165             :         }
     166          12 :         tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
     167          12 :         return true;
     168             : }
     169             : 
     170           2 : static void mds_es_connected(struct tevent_req *subreq)
     171             : {
     172           2 :         struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data(
     173             :                 subreq, struct mds_es_ctx);
     174             :         int ret;
     175             :         bool ok;
     176             : 
     177           2 :         ret = mds_es_connect_recv(subreq);
     178           2 :         TALLOC_FREE(subreq);
     179           2 :         if (ret != 0) {
     180           0 :                 DBG_ERR("HTTP connect failed\n");
     181           0 :                 return;
     182             :         }
     183             : 
     184           2 :         ok = mds_es_next_search_trigger(mds_es_ctx);
     185           2 :         if (!ok) {
     186           0 :                 DBG_ERR("mds_es_next_search_trigger failed\n");
     187             :         }
     188           2 :         return;
     189             : }
     190             : 
     191             : struct mds_es_connect_state {
     192             :         struct tevent_context *ev;
     193             :         struct mds_es_ctx *mds_es_ctx;
     194             :         struct tevent_queue_entry *qe;
     195             :         const char *server_addr;
     196             :         uint16_t server_port;
     197             :         struct tstream_tls_params *tls_params;
     198             : };
     199             : 
     200             : static void mds_es_http_connect_done(struct tevent_req *subreq);
     201             : static void mds_es_http_waited(struct tevent_req *subreq);
     202             : 
     203          12 : static struct tevent_req *mds_es_connect_send(
     204             :                                 TALLOC_CTX *mem_ctx,
     205             :                                 struct tevent_context *ev,
     206             :                                 struct mds_es_ctx *mds_es_ctx)
     207             : {
     208          12 :         struct tevent_req *req = NULL;
     209          12 :         struct tevent_req *subreq = NULL;
     210          12 :         struct mds_es_connect_state *state = NULL;
     211          12 :         const char *server_addr = NULL;
     212             :         bool use_tls;
     213             :         NTSTATUS status;
     214             : 
     215          12 :         req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state);
     216          12 :         if (req == NULL) {
     217           0 :                 return NULL;
     218             :         }
     219          12 :         *state = (struct mds_es_connect_state) {
     220             :                 .ev = ev,
     221             :                 .mds_es_ctx = mds_es_ctx,
     222             :         };
     223             : 
     224          12 :         server_addr = lp_parm_const_string(
     225          12 :                 mds_es_ctx->mds_ctx->snum,
     226             :                 "elasticsearch",
     227             :                 "address",
     228             :                 "localhost");
     229          12 :         state->server_addr = talloc_strdup(state, server_addr);
     230          12 :         if (tevent_req_nomem(state->server_addr, req)) {
     231           0 :                 return tevent_req_post(req, ev);
     232             :         }
     233             : 
     234          24 :         state->server_port = lp_parm_int(
     235          12 :                 mds_es_ctx->mds_ctx->snum,
     236             :                 "elasticsearch",
     237             :                 "port",
     238             :                 9200);
     239             : 
     240          12 :         use_tls = lp_parm_bool(
     241          12 :                 mds_es_ctx->mds_ctx->snum,
     242             :                 "elasticsearch",
     243             :                 "use tls",
     244             :                 false);
     245             : 
     246          12 :         DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n",
     247             :                   use_tls ? "S" : "", state->server_addr, state->server_port);
     248             : 
     249          12 :         if (use_tls) {
     250           0 :                 struct loadparm_context *lp_ctx = NULL;
     251             : 
     252           0 :                 lp_ctx = loadparm_init_s3(state, loadparm_s3_helpers());
     253           0 :                 if (tevent_req_nomem(lp_ctx, req)) {
     254           0 :                         return tevent_req_post(req, ev);
     255             :                 }
     256             : 
     257           0 :                 status = tstream_tls_params_client_lpcfg(state,
     258             :                                                          lp_ctx,
     259           0 :                                                          state->server_addr,
     260           0 :                                                          &state->tls_params);
     261           0 :                 TALLOC_FREE(lp_ctx);
     262           0 :                 if (!NT_STATUS_IS_OK(status)) {
     263           0 :                         DBG_ERR("Failed tstream_tls_params_client - %s\n",
     264             :                                 nt_errstr(status));
     265           0 :                         tevent_req_nterror(req, status);
     266           0 :                         return tevent_req_post(req, ev);
     267             :                 }
     268             :         }
     269             : 
     270          12 :         subreq = http_connect_send(state,
     271          12 :                                    state->ev,
     272          12 :                                    state->server_addr,
     273          12 :                                    state->server_port,
     274          12 :                                    mds_es_ctx->mdssvc_es_ctx->creds,
     275          12 :                                    state->tls_params);
     276          12 :         if (tevent_req_nomem(subreq, req)) {
     277           0 :                 return tevent_req_post(req, ev);
     278             :         }
     279          12 :         tevent_req_set_callback(subreq, mds_es_http_connect_done, req);
     280          12 :         return req;
     281             : }
     282             : 
     283          12 : static void mds_es_http_connect_done(struct tevent_req *subreq)
     284             : {
     285          12 :         struct tevent_req *req = tevent_req_callback_data(
     286             :                 subreq, struct tevent_req);
     287          12 :         struct mds_es_connect_state *state = tevent_req_data(
     288             :                 req, struct mds_es_connect_state);
     289             :         int error;
     290             : 
     291          12 :         error = http_connect_recv(subreq,
     292          12 :                                   state->mds_es_ctx,
     293          12 :                                   &state->mds_es_ctx->http_conn);
     294          12 :         TALLOC_FREE(subreq);
     295          12 :         if (error != 0) {
     296          10 :                 DBG_ERR("HTTP connect failed, retrying...\n");
     297             : 
     298          10 :                 subreq = tevent_wakeup_send(
     299          10 :                         state->mds_es_ctx,
     300          10 :                         state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     301             :                         tevent_timeval_current_ofs(10, 0));
     302          10 :                 if (tevent_req_nomem(subreq, req)) {
     303           0 :                         return;
     304             :                 }
     305          10 :                 tevent_req_set_callback(subreq,
     306             :                                         mds_es_http_waited,
     307             :                                         req);
     308          10 :                 return;
     309             :         }
     310             : 
     311           2 :         DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n",
     312             :                   state->tls_params ? "S" : "",
     313             :                   state->server_addr, state->server_port);
     314             : 
     315           2 :         tevent_req_done(req);
     316           2 :         return;
     317             : }
     318             : 
     319           0 : static void mds_es_http_waited(struct tevent_req *subreq)
     320             : {
     321           0 :         struct tevent_req *req = tevent_req_callback_data(
     322             :                 subreq, struct tevent_req);
     323           0 :         struct mds_es_connect_state *state = tevent_req_data(
     324             :                 req, struct mds_es_connect_state);
     325             :         bool ok;
     326             : 
     327           0 :         ok = tevent_wakeup_recv(subreq);
     328           0 :         TALLOC_FREE(subreq);
     329           0 :         if (!ok) {
     330           0 :                 tevent_req_error(req, ETIMEDOUT);
     331           0 :                 return;
     332             :         }
     333             : 
     334           0 :         subreq = mds_es_connect_send(
     335           0 :                         state->mds_es_ctx,
     336           0 :                         state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     337             :                         state->mds_es_ctx);
     338           0 :         if (tevent_req_nomem(subreq, req)) {
     339           0 :                 return;
     340             :         }
     341           0 :         tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx);
     342             : }
     343             : 
     344           2 : static int mds_es_connect_recv(struct tevent_req *req)
     345             : {
     346           2 :         return tevent_req_simple_recv_unix(req);
     347             : }
     348             : 
     349           0 : static void mds_es_reconnect_on_error(struct sl_es_search *s)
     350             : {
     351           0 :         struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
     352           0 :         struct tevent_req *subreq = NULL;
     353             : 
     354           0 :         if (s->slq != NULL) {
     355           0 :                 s->slq->state = SLQ_STATE_ERROR;
     356             :         }
     357             : 
     358           0 :         DBG_WARNING("Reconnecting HTTP...\n");
     359           0 :         TALLOC_FREE(mds_es_ctx->http_conn);
     360             : 
     361           0 :         subreq = mds_es_connect_send(
     362             :                         mds_es_ctx,
     363           0 :                         mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     364             :                         mds_es_ctx);
     365           0 :         if (subreq == NULL) {
     366           0 :                 DBG_ERR("mds_es_connect_send failed\n");
     367           0 :                 return;
     368             :         }
     369           0 :         tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
     370             : }
     371             : 
     372           2 : static int search_destructor(struct sl_es_search *s)
     373             : {
     374           2 :         if (s->mds_es_ctx == NULL) {
     375           0 :                 return 0;
     376             :         }
     377           2 :         DLIST_REMOVE(s->mds_es_ctx->searches, s);
     378           2 :         return 0;
     379             : }
     380             : 
     381             : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
     382             :                                              struct tevent_context *ev,
     383             :                                              struct sl_es_search *s);
     384             : static int mds_es_search_recv(struct tevent_req *req);
     385             : static void mds_es_search_done(struct tevent_req *subreq);
     386             : 
     387           2 : static bool mds_es_search(struct sl_query *slq)
     388             : {
     389           2 :         struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort(
     390             :                 slq->mds_ctx->backend_private, struct mds_es_ctx);
     391           2 :         struct sl_es_search *s = NULL;
     392             :         bool ok;
     393             : 
     394           2 :         s = talloc_zero(slq, struct sl_es_search);
     395           2 :         if (s == NULL) {
     396           0 :                 return false;
     397             :         }
     398           2 :         *s = (struct sl_es_search) {
     399           2 :                 .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
     400             :                 .mds_es_ctx = mds_es_ctx,
     401             :                 .slq = slq,
     402             :                 .size = SL_PAGESIZE,
     403             :         };
     404             : 
     405             :         /* 0 would mean no limit */
     406           2 :         s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum,
     407             :                                    "elasticsearch",
     408             :                                    "max results",
     409             :                                    MAX_SL_RESULTS);
     410             : 
     411           2 :         DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string);
     412             : 
     413           2 :         ok = map_spotlight_to_es_query(
     414             :                 s,
     415           2 :                 mds_es_ctx->mdssvc_es_ctx->mappings,
     416             :                 slq->path_scope,
     417           2 :                 slq->query_string,
     418             :                 &s->es_query);
     419           2 :         if (!ok) {
     420           0 :                 TALLOC_FREE(s);
     421           0 :                 return false;
     422             :         }
     423           2 :         DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query);
     424             : 
     425           2 :         slq->backend_private = s;
     426           2 :         slq->state = SLQ_STATE_RUNNING;
     427           2 :         DLIST_ADD_END(mds_es_ctx->searches, s);
     428           2 :         talloc_set_destructor(s, search_destructor);
     429             : 
     430           2 :         return mds_es_next_search_trigger(mds_es_ctx);
     431             : }
     432             : 
     433           6 : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx)
     434             : {
     435           6 :         struct tevent_req *subreq = NULL;
     436           6 :         struct sl_es_search *s = mds_es_ctx->searches;
     437             : 
     438           6 :         if (mds_es_ctx->http_conn == NULL) {
     439           0 :                 DBG_DEBUG("Waiting for HTTP connection...\n");
     440           0 :                 return true;
     441             :         }
     442           6 :         if (s == NULL) {
     443           4 :                 DBG_DEBUG("No pending searches, idling...\n");
     444           4 :                 return true;
     445             :         }
     446           2 :         if (s->pending) {
     447           0 :                 DBG_DEBUG("Search pending [%p]\n", s);
     448           0 :                 return true;
     449             :         }
     450             : 
     451           2 :         subreq = mds_es_search_send(s, s->ev, s);
     452           2 :         if (subreq == NULL) {
     453           0 :                 return false;
     454             :         }
     455           2 :         tevent_req_set_callback(subreq, mds_es_search_done, s);
     456           2 :         mds_es_search_set_pending(s);
     457           2 :         return true;
     458             : }
     459             : 
     460           2 : static void mds_es_search_done(struct tevent_req *subreq)
     461             : {
     462           2 :         struct sl_es_search *s = tevent_req_callback_data(
     463             :                 subreq, struct sl_es_search);
     464           2 :         struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
     465           2 :         struct sl_query *slq = s->slq;
     466             :         int ret;
     467             :         bool ok;
     468             : 
     469           2 :         DBG_DEBUG("Search done for search [%p]\n", s);
     470             : 
     471           2 :         mds_es_search_unset_pending(s);
     472             : 
     473           2 :         if (mds_es_ctx == NULL) {
     474             :                 /*
     475             :                  * Search connection closed by the user while s was pending.
     476             :                  */
     477           0 :                 TALLOC_FREE(s);
     478           0 :                 return;
     479             :         }
     480             : 
     481           2 :         DLIST_REMOVE(mds_es_ctx->searches, s);
     482             : 
     483           2 :         ret = mds_es_search_recv(subreq);
     484           2 :         TALLOC_FREE(subreq);
     485           2 :         if (ret != 0) {
     486           0 :                 mds_es_reconnect_on_error(s);
     487           0 :                 return;
     488             :         }
     489             : 
     490           2 :         if (slq == NULL) {
     491             :                 /*
     492             :                  * Closed by the user. Explicitly free "s" here because the
     493             :                  * talloc parent slq is already gone.
     494             :                  */
     495           0 :                 TALLOC_FREE(s);
     496           0 :                 goto trigger;
     497             :         }
     498             : 
     499           2 :         SLQ_DEBUG(10, slq, "search done");
     500             : 
     501           2 :         if (s->total == 0 || s->from >= s->max) {
     502           2 :                 slq->state = SLQ_STATE_DONE;
     503           2 :                 goto trigger;
     504             :         }
     505             : 
     506           0 :         if (slq->query_results->num_results >= SL_PAGESIZE) {
     507           0 :                 slq->state = SLQ_STATE_FULL;
     508           0 :                 goto trigger;
     509             :         }
     510             : 
     511             :         /*
     512             :          * Reschedule this query as there are more results waiting in the
     513             :          * Elasticsearch server and the client result queue has room as
     514             :          * well. But put it at the end of the list of active queries as a simple
     515             :          * heuristic that should ensure all client queries are dispatched to the
     516             :          * server.
     517             :          */
     518           0 :         DLIST_ADD_END(mds_es_ctx->searches, s);
     519             : 
     520           2 : trigger:
     521           2 :         ok = mds_es_next_search_trigger(mds_es_ctx);
     522           2 :         if (!ok) {
     523           0 :                 DBG_ERR("mds_es_next_search_trigger failed\n");
     524             :         }
     525             : }
     526             : 
     527             : static void mds_es_search_http_send_done(struct tevent_req *subreq);
     528             : static void mds_es_search_http_read_done(struct tevent_req *subreq);
     529             : 
     530             : struct mds_es_search_state {
     531             :         struct tevent_context *ev;
     532             :         struct sl_es_search *s;
     533             :         struct tevent_queue_entry *qe;
     534             :         struct http_request http_request;
     535             :         struct http_request *http_response;
     536             : };
     537             : 
     538           0 : static int mds_es_search_pending_destructor(struct sl_es_search *s)
     539             : {
     540             :         /*
     541             :          * s is a child of slq which may get freed when a user closes a
     542             :          * query. To maintain the HTTP request/response sequence on the HTTP
     543             :          * channel, we keep processing pending requests and free s when we
     544             :          * receive the HTTP response for pending requests.
     545             :          */
     546           0 :         DBG_DEBUG("Preserving pending search [%p]\n", s);
     547           0 :         s->slq = NULL;
     548           0 :         return -1;
     549             : }
     550             : 
     551           2 : static void mds_es_search_set_pending(struct sl_es_search *s)
     552             : {
     553           2 :         DBG_DEBUG("Set pending [%p]\n", s);
     554           2 :         SLQ_DEBUG(10, s->slq, "pending");
     555             : 
     556           2 :         s->pending = true;
     557           2 :         talloc_set_destructor(s, mds_es_search_pending_destructor);
     558           2 : }
     559             : 
     560           2 : static void mds_es_search_unset_pending(struct sl_es_search *s)
     561             : {
     562           2 :         DBG_DEBUG("Unset pending [%p]\n", s);
     563           2 :         if (s->slq != NULL) {
     564           2 :                 SLQ_DEBUG(10, s->slq, "unset pending");
     565             :         }
     566             : 
     567           2 :         s->pending = false;
     568           2 :         talloc_set_destructor(s, search_destructor);
     569           2 : }
     570             : 
     571           2 : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
     572             :                                               struct tevent_context *ev,
     573             :                                               struct sl_es_search *s)
     574             : {
     575           2 :         struct tevent_req *req = NULL;
     576           2 :         struct tevent_req *subreq = NULL;
     577           2 :         struct mds_es_search_state *state = NULL;
     578           2 :         const char *index = NULL;
     579           2 :         char *elastic_query = NULL;
     580           2 :         char *uri = NULL;
     581             :         size_t elastic_query_len;
     582           2 :         char *elastic_query_len_str = NULL;
     583           2 :         char *hostname = NULL;
     584           2 :         bool pretty = false;
     585             : 
     586           2 :         req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state);
     587           2 :         if (req == NULL) {
     588           0 :                 return NULL;
     589             :         }
     590           2 :         *state = (struct mds_es_search_state) {
     591             :                 .ev = ev,
     592             :                 .s = s,
     593             :         };
     594             : 
     595           2 :         if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) {
     596           0 :                 return tevent_req_post(req, s->ev);
     597             :         }
     598             : 
     599           2 :         index = lp_parm_const_string(s->slq->mds_ctx->snum,
     600             :                                      "elasticsearch",
     601             :                                      "index",
     602             :                                      "_all");
     603           2 :         if (tevent_req_nomem(index, req)) {
     604           0 :                 return tevent_req_post(req, ev);
     605             :         }
     606             : 
     607           2 :         if (DEBUGLVL(10)) {
     608           0 :                 pretty = true;
     609             :         }
     610             : 
     611           2 :         uri = talloc_asprintf(state,
     612             :                               "/%s/_search%s",
     613             :                               index,
     614             :                               pretty ? "?pretty" : "");
     615           2 :         if (tevent_req_nomem(uri, req)) {
     616           0 :                 return tevent_req_post(req, ev);
     617             :         }
     618             : 
     619           2 :         elastic_query = talloc_asprintf(state,
     620             :                                         MDSSVC_ELASTIC_QUERY_TEMPLATE,
     621             :                                         s->from,
     622             :                                         s->size,
     623             :                                         MDSSVC_ELASTIC_SOURCES,
     624             :                                         s->es_query);
     625           2 :         if (tevent_req_nomem(elastic_query, req)) {
     626           0 :                 return tevent_req_post(req, ev);
     627             :         }
     628           2 :         DBG_DEBUG("Elastic query: '%s'\n", elastic_query);
     629             : 
     630           2 :         elastic_query_len = strlen(elastic_query);
     631             : 
     632           4 :         state->http_request = (struct http_request) {
     633             :                 .type = HTTP_REQ_POST,
     634             :                 .uri = uri,
     635           2 :                 .body = data_blob_const(elastic_query, elastic_query_len),
     636             :                 .major = '1',
     637             :                 .minor = '1',
     638             :         };
     639             : 
     640           2 :         elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len);
     641           2 :         if (tevent_req_nomem(elastic_query_len_str, req)) {
     642           0 :                 return tevent_req_post(req, ev);
     643             :         }
     644             : 
     645           2 :         hostname = get_myname(state);
     646           2 :         if (tevent_req_nomem(hostname, req)) {
     647           0 :                 return tevent_req_post(req, ev);
     648             :         }
     649             : 
     650           2 :         http_add_header(state, &state->http_request.headers,
     651             :                         "Content-Type",       "application/json");
     652           2 :         http_add_header(state, &state->http_request.headers,
     653             :                         "Accept", "application/json");
     654           2 :         http_add_header(state, &state->http_request.headers,
     655             :                         "User-Agent", "Samba/mdssvc");
     656           2 :         http_add_header(state, &state->http_request.headers,
     657             :                         "Host", hostname);
     658           2 :         http_add_header(state, &state->http_request.headers,
     659             :                         "Content-Length", elastic_query_len_str);
     660             : 
     661           2 :         subreq = http_send_request_send(state,
     662             :                                         ev,
     663           2 :                                         s->mds_es_ctx->http_conn,
     664           2 :                                         &state->http_request);
     665           2 :         if (tevent_req_nomem(subreq, req)) {
     666           0 :                 return tevent_req_post(req, ev);
     667             :         }
     668           2 :         tevent_req_set_callback(subreq, mds_es_search_http_send_done, req);
     669           2 :         return req;
     670             : }
     671             : 
     672           2 : static void mds_es_search_http_send_done(struct tevent_req *subreq)
     673             : {
     674           2 :         struct tevent_req *req = tevent_req_callback_data(
     675             :                 subreq, struct tevent_req);
     676           2 :         struct mds_es_search_state *state = tevent_req_data(
     677             :                 req, struct mds_es_search_state);
     678             :         NTSTATUS status;
     679             : 
     680           2 :         DBG_DEBUG("Sent out search [%p]\n", state->s);
     681             : 
     682           2 :         status = http_send_request_recv(subreq);
     683           2 :         TALLOC_FREE(subreq);
     684           2 :         if (!NT_STATUS_IS_OK(status)) {
     685           0 :                 tevent_req_error(req, map_errno_from_nt_status(status));
     686           0 :                 return;
     687             :         }
     688             : 
     689           2 :         if (state->s->mds_es_ctx == NULL || state->s->slq == NULL) {
     690           0 :                 tevent_req_done(req);
     691           0 :                 return;
     692             :         }
     693             : 
     694           2 :         subreq = http_read_response_send(state,
     695             :                                          state->ev,
     696           2 :                                          state->s->mds_es_ctx->http_conn,
     697             :                                          SL_PAGESIZE * 8192);
     698           2 :         if (tevent_req_nomem(subreq, req)) {
     699           0 :                 return;
     700             :         }
     701           2 :         tevent_req_set_callback(subreq, mds_es_search_http_read_done, req);
     702             : }
     703             : 
     704           2 : static void mds_es_search_http_read_done(struct tevent_req *subreq)
     705             : {
     706           2 :         struct tevent_req *req = tevent_req_callback_data(
     707             :                 subreq, struct tevent_req);
     708           2 :         struct mds_es_search_state *state = tevent_req_data(
     709             :                 req, struct mds_es_search_state);
     710           2 :         struct sl_es_search *s = state->s;
     711           2 :         struct sl_query *slq = s->slq;
     712           2 :         json_t *root = NULL;
     713           2 :         json_t *matches = NULL;
     714           2 :         json_t *match = NULL;
     715             :         size_t i;
     716             :         json_error_t error;
     717             :         size_t hits;
     718             :         NTSTATUS status;
     719             :         int ret;
     720             :         bool ok;
     721             : 
     722           2 :         DBG_DEBUG("Got response for search [%p]\n", s);
     723             : 
     724           2 :         status = http_read_response_recv(subreq, state, &state->http_response);
     725           2 :         TALLOC_FREE(subreq);
     726           2 :         if (!NT_STATUS_IS_OK(status)) {
     727           0 :                 DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status));
     728           0 :                 tevent_req_error(req, map_errno_from_nt_status(status));
     729           0 :                 return;
     730             :         }
     731             : 
     732           2 :         if (slq == NULL || s->mds_es_ctx == NULL) {
     733           0 :                 tevent_req_done(req);
     734           0 :                 return;
     735             :         }
     736             : 
     737           2 :         switch (state->http_response->response_code) {
     738           2 :         case 200:
     739           2 :                 break;
     740           0 :         default:
     741           0 :                 DBG_ERR("HTTP server response: %u\n",
     742             :                         state->http_response->response_code);
     743           0 :                 goto fail;
     744             :         }
     745             : 
     746           2 :         DBG_DEBUG("JSON response:\n%s\n",
     747             :                   talloc_strndup(talloc_tos(),
     748             :                                  (char *)state->http_response->body.data,
     749             :                                  state->http_response->body.length));
     750             : 
     751           2 :         root = json_loadb((char *)state->http_response->body.data,
     752           2 :                           state->http_response->body.length,
     753             :                           0,
     754             :                           &error);
     755           2 :         if (root == NULL) {
     756           0 :                 DBG_ERR("json_loadb failed\n");
     757           0 :                 goto fail;
     758             :         }
     759             : 
     760           2 :         if (s->total == 0) {
     761             :                 /*
     762             :                  * Get the total number of results the first time, format
     763             :                  * used by Elasticsearch 7.0 or newer
     764             :                  */
     765           2 :                 ret = json_unpack(root, "{s: {s: {s: i}}}",
     766             :                                   "hits", "total", "value", &s->total);
     767           2 :                 if (ret != 0) {
     768             :                         /* Format used before 7.0 */
     769           0 :                         ret = json_unpack(root, "{s: {s: i}}",
     770             :                                           "hits", "total", &s->total);
     771           0 :                         if (ret != 0) {
     772           0 :                                 DBG_ERR("json_unpack failed\n");
     773           0 :                                 goto fail;
     774             :                         }
     775             :                 }
     776             : 
     777           2 :                 DBG_DEBUG("Total: %zu\n", s->total);
     778             : 
     779           2 :                 if (s->total == 0) {
     780           0 :                         json_decref(root);
     781           0 :                         tevent_req_done(req);
     782           0 :                         return;
     783             :                 }
     784             :         }
     785             : 
     786           2 :         if (s->max == 0 || s->max > s->total) {
     787           2 :                 s->max = s->total;
     788             :         }
     789             : 
     790           2 :         ret = json_unpack(root, "{s: {s:o}}",
     791             :                           "hits", "hits", &matches);
     792           2 :         if (ret != 0 || matches == NULL) {
     793           0 :                 DBG_ERR("json_unpack hits failed\n");
     794           0 :                 goto fail;
     795             :         }
     796             : 
     797           2 :         hits = json_array_size(matches);
     798           2 :         if (hits == 0) {
     799           0 :                 DBG_ERR("Hu?! No results?\n");
     800           0 :                 goto fail;
     801             :         }
     802           2 :         DBG_DEBUG("Hits: %zu\n", hits);
     803             : 
     804           6 :         for (i = 0; i < hits && s->from + i < s->max; i++) {
     805           4 :                 const char *path = NULL;
     806             : 
     807           4 :                 match = json_array_get(matches, i);
     808           4 :                 if (match == NULL) {
     809           0 :                         DBG_ERR("Hu?! No value for index %zu\n", i);
     810           0 :                         goto fail;
     811             :                 }
     812           4 :                 ret = json_unpack(match,
     813             :                                   "{s: {s: {s: s}}}",
     814             :                                   "_source",
     815             :                                   "path",
     816             :                                   "real",
     817             :                                   &path);
     818           4 :                 if (ret != 0) {
     819           0 :                         DBG_ERR("Missing path.real in JSON result\n");
     820           0 :                         goto fail;
     821             :                 }
     822             : 
     823           4 :                 ok = mds_add_result(slq, path);
     824           4 :                 if (!ok) {
     825           0 :                         DBG_ERR("error adding result for path: %s\n", path);
     826           0 :                         goto fail;
     827             :                 }
     828             :         }
     829           2 :         json_decref(root);
     830             : 
     831           2 :         s->from += hits;
     832           2 :         slq->state = SLQ_STATE_RESULTS;
     833           2 :         tevent_req_done(req);
     834           2 :         return;
     835             : 
     836           0 : fail:
     837           0 :         if (root != NULL) {
     838           0 :                 json_decref(root);
     839             :         }
     840           0 :         slq->state = SLQ_STATE_ERROR;
     841           0 :         tevent_req_error(req, EINVAL);
     842           0 :         return;
     843             : }
     844             : 
     845           2 : static int mds_es_search_recv(struct tevent_req *req)
     846             : {
     847           2 :         return tevent_req_simple_recv_unix(req);
     848             : }
     849             : 
     850           0 : static bool mds_es_search_cont(struct sl_query *slq)
     851             : {
     852           0 :         struct sl_es_search *s = talloc_get_type_abort(
     853             :                 slq->backend_private, struct sl_es_search);
     854             : 
     855           0 :         SLQ_DEBUG(10, slq, "continue");
     856           0 :         DLIST_ADD_END(s->mds_es_ctx->searches, s);
     857           0 :         return mds_es_next_search_trigger(s->mds_es_ctx);
     858             : }
     859             : 
     860             : struct mdssvc_backend mdsscv_backend_es = {
     861             :         .init = mdssvc_es_init,
     862             :         .shutdown = mdssvc_es_shutdown,
     863             :         .connect = mds_es_connect,
     864             :         .search_start = mds_es_search,
     865             :         .search_cont = mds_es_search_cont,
     866             : };

Generated by: LCOV version 1.14