Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Main metadata server / Spotlight routines / ES backend
4 :
5 : Copyright (C) Ralph Boehme 2019
6 :
7 : This program is free software; you can redistribute it and/or modify
8 : it under the terms of the GNU General Public License as published by
9 : the Free Software Foundation; either version 3 of the License, or
10 : (at your option) any later version.
11 :
12 : This program is distributed in the hope that it will be useful,
13 : but WITHOUT ANY WARRANTY; without even the implied warranty of
14 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 : GNU General Public License for more details.
16 :
17 : You should have received a copy of the GNU General Public License
18 : along with this program. If not, see <http://www.gnu.org/licenses/>.
19 : */
20 :
21 : #include "includes.h"
22 : #include "system/filesys.h"
23 : #include "lib/util/time_basic.h"
24 : #include "lib/tls/tls.h"
25 : #include "lib/util/tevent_ntstatus.h"
26 : #include "libcli/http/http.h"
27 : #include "lib/util/tevent_unix.h"
28 : #include "credentials.h"
29 : #include "mdssvc.h"
30 : #include "mdssvc_es.h"
31 : #include "rpc_server/mdssvc/es_parser.tab.h"
32 : #include "lib/param/param.h"
33 :
34 : #include <jansson.h>
35 :
36 : #undef DBGC_CLASS
37 : #define DBGC_CLASS DBGC_RPC_SRV
38 :
39 : #define MDSSVC_ELASTIC_QUERY_TEMPLATE \
40 : "{" \
41 : " \"from\": %zu," \
42 : " \"size\": %zu," \
43 : " \"_source\": [%s]," \
44 : " \"query\": {" \
45 : " \"query_string\": {" \
46 : " \"query\": \"%s\"" \
47 : " }" \
48 : " }" \
49 : "}"
50 :
51 : #define MDSSVC_ELASTIC_SOURCES \
52 : "\"path.real\""
53 :
54 4 : static bool mdssvc_es_init(struct mdssvc_ctx *mdssvc_ctx)
55 : {
56 4 : struct mdssvc_es_ctx *mdssvc_es_ctx = NULL;
57 : json_error_t json_error;
58 4 : char *default_path = NULL;
59 4 : const char *path = NULL;
60 :
61 4 : mdssvc_es_ctx = talloc_zero(mdssvc_ctx, struct mdssvc_es_ctx);
62 4 : if (mdssvc_es_ctx == NULL) {
63 0 : return false;
64 : }
65 4 : mdssvc_es_ctx->mdssvc_ctx = mdssvc_ctx;
66 :
67 4 : mdssvc_es_ctx->creds = cli_credentials_init_anon(mdssvc_es_ctx);
68 4 : if (mdssvc_es_ctx->creds == NULL) {
69 0 : TALLOC_FREE(mdssvc_es_ctx);
70 0 : return false;
71 : }
72 :
73 4 : default_path = talloc_asprintf(
74 : mdssvc_es_ctx,
75 : "%s/mdssvc/elasticsearch_mappings.json",
76 : get_dyn_SAMBA_DATADIR());
77 4 : if (default_path == NULL) {
78 0 : TALLOC_FREE(mdssvc_es_ctx);
79 0 : return false;
80 : }
81 :
82 4 : path = lp_parm_const_string(GLOBAL_SECTION_SNUM,
83 : "elasticsearch",
84 : "mappings",
85 : default_path);
86 4 : if (path == NULL) {
87 0 : TALLOC_FREE(mdssvc_es_ctx);
88 0 : return false;
89 : }
90 :
91 4 : mdssvc_es_ctx->mappings = json_load_file(path, 0, &json_error);
92 4 : if (mdssvc_es_ctx->mappings == NULL) {
93 0 : DBG_ERR("Opening mapping file [%s] failed: %s\n",
94 : path, json_error.text);
95 0 : TALLOC_FREE(mdssvc_es_ctx);
96 0 : return false;
97 : }
98 4 : TALLOC_FREE(default_path);
99 :
100 4 : mdssvc_ctx->backend_private = mdssvc_es_ctx;
101 4 : return true;
102 : }
103 :
104 4 : static bool mdssvc_es_shutdown(struct mdssvc_ctx *mdssvc_ctx)
105 : {
106 4 : return true;
107 : }
108 :
109 : static struct tevent_req *mds_es_connect_send(
110 : TALLOC_CTX *mem_ctx,
111 : struct tevent_context *ev,
112 : struct mds_es_ctx *mds_es_ctx);
113 : static int mds_es_connect_recv(struct tevent_req *req);
114 : static void mds_es_connected(struct tevent_req *subreq);
115 : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx);
116 : static void mds_es_search_set_pending(struct sl_es_search *s);
117 : static void mds_es_search_unset_pending(struct sl_es_search *s);
118 :
119 12 : static int mds_es_ctx_destructor(struct mds_es_ctx *mds_es_ctx)
120 : {
121 12 : struct sl_es_search *s = mds_es_ctx->searches;
122 :
123 : /*
124 : * The per tree-connect state mds_es_ctx (a child of mds_ctx) is about
125 : * to go away and has already freed all waiting searches. If there's a
126 : * search remaining that's when the search is already active. Reset the
127 : * mds_es_ctx pointer, so we can detect this when the search completes.
128 : */
129 :
130 12 : if (s == NULL) {
131 12 : return 0;
132 : }
133 :
134 0 : s->mds_es_ctx = NULL;
135 :
136 0 : return 0;
137 : }
138 :
139 12 : static bool mds_es_connect(struct mds_ctx *mds_ctx)
140 : {
141 12 : struct mdssvc_es_ctx *mdssvc_es_ctx = talloc_get_type_abort(
142 : mds_ctx->mdssvc_ctx->backend_private, struct mdssvc_es_ctx);
143 12 : struct mds_es_ctx *mds_es_ctx = NULL;
144 12 : struct tevent_req *subreq = NULL;
145 :
146 12 : mds_es_ctx = talloc_zero(mds_ctx, struct mds_es_ctx);
147 12 : if (mds_es_ctx == NULL) {
148 0 : return false;
149 : }
150 12 : *mds_es_ctx = (struct mds_es_ctx) {
151 : .mdssvc_es_ctx = mdssvc_es_ctx,
152 : .mds_ctx = mds_ctx,
153 : };
154 :
155 12 : mds_ctx->backend_private = mds_es_ctx;
156 12 : talloc_set_destructor(mds_es_ctx, mds_es_ctx_destructor);
157 :
158 12 : subreq = mds_es_connect_send(
159 : mds_es_ctx,
160 12 : mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
161 : mds_es_ctx);
162 12 : if (subreq == NULL) {
163 0 : TALLOC_FREE(mds_es_ctx);
164 0 : return false;
165 : }
166 12 : tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
167 12 : return true;
168 : }
169 :
170 2 : static void mds_es_connected(struct tevent_req *subreq)
171 : {
172 2 : struct mds_es_ctx *mds_es_ctx = tevent_req_callback_data(
173 : subreq, struct mds_es_ctx);
174 : int ret;
175 : bool ok;
176 :
177 2 : ret = mds_es_connect_recv(subreq);
178 2 : TALLOC_FREE(subreq);
179 2 : if (ret != 0) {
180 0 : DBG_ERR("HTTP connect failed\n");
181 0 : return;
182 : }
183 :
184 2 : ok = mds_es_next_search_trigger(mds_es_ctx);
185 2 : if (!ok) {
186 0 : DBG_ERR("mds_es_next_search_trigger failed\n");
187 : }
188 2 : return;
189 : }
190 :
191 : struct mds_es_connect_state {
192 : struct tevent_context *ev;
193 : struct mds_es_ctx *mds_es_ctx;
194 : struct tevent_queue_entry *qe;
195 : const char *server_addr;
196 : uint16_t server_port;
197 : struct tstream_tls_params *tls_params;
198 : };
199 :
200 : static void mds_es_http_connect_done(struct tevent_req *subreq);
201 : static void mds_es_http_waited(struct tevent_req *subreq);
202 :
203 12 : static struct tevent_req *mds_es_connect_send(
204 : TALLOC_CTX *mem_ctx,
205 : struct tevent_context *ev,
206 : struct mds_es_ctx *mds_es_ctx)
207 : {
208 12 : struct tevent_req *req = NULL;
209 12 : struct tevent_req *subreq = NULL;
210 12 : struct mds_es_connect_state *state = NULL;
211 12 : const char *server_addr = NULL;
212 : bool use_tls;
213 : NTSTATUS status;
214 :
215 12 : req = tevent_req_create(mem_ctx, &state, struct mds_es_connect_state);
216 12 : if (req == NULL) {
217 0 : return NULL;
218 : }
219 12 : *state = (struct mds_es_connect_state) {
220 : .ev = ev,
221 : .mds_es_ctx = mds_es_ctx,
222 : };
223 :
224 12 : server_addr = lp_parm_const_string(
225 12 : mds_es_ctx->mds_ctx->snum,
226 : "elasticsearch",
227 : "address",
228 : "localhost");
229 12 : state->server_addr = talloc_strdup(state, server_addr);
230 12 : if (tevent_req_nomem(state->server_addr, req)) {
231 0 : return tevent_req_post(req, ev);
232 : }
233 :
234 24 : state->server_port = lp_parm_int(
235 12 : mds_es_ctx->mds_ctx->snum,
236 : "elasticsearch",
237 : "port",
238 : 9200);
239 :
240 12 : use_tls = lp_parm_bool(
241 12 : mds_es_ctx->mds_ctx->snum,
242 : "elasticsearch",
243 : "use tls",
244 : false);
245 :
246 12 : DBG_DEBUG("Connecting to HTTP%s [%s] port [%"PRIu16"]\n",
247 : use_tls ? "S" : "", state->server_addr, state->server_port);
248 :
249 12 : if (use_tls) {
250 0 : struct loadparm_context *lp_ctx = NULL;
251 :
252 0 : lp_ctx = loadparm_init_s3(state, loadparm_s3_helpers());
253 0 : if (tevent_req_nomem(lp_ctx, req)) {
254 0 : return tevent_req_post(req, ev);
255 : }
256 :
257 0 : status = tstream_tls_params_client_lpcfg(state,
258 : lp_ctx,
259 0 : state->server_addr,
260 0 : &state->tls_params);
261 0 : TALLOC_FREE(lp_ctx);
262 0 : if (!NT_STATUS_IS_OK(status)) {
263 0 : DBG_ERR("Failed tstream_tls_params_client - %s\n",
264 : nt_errstr(status));
265 0 : tevent_req_nterror(req, status);
266 0 : return tevent_req_post(req, ev);
267 : }
268 : }
269 :
270 12 : subreq = http_connect_send(state,
271 12 : state->ev,
272 12 : state->server_addr,
273 12 : state->server_port,
274 12 : mds_es_ctx->mdssvc_es_ctx->creds,
275 12 : state->tls_params);
276 12 : if (tevent_req_nomem(subreq, req)) {
277 0 : return tevent_req_post(req, ev);
278 : }
279 12 : tevent_req_set_callback(subreq, mds_es_http_connect_done, req);
280 12 : return req;
281 : }
282 :
283 12 : static void mds_es_http_connect_done(struct tevent_req *subreq)
284 : {
285 12 : struct tevent_req *req = tevent_req_callback_data(
286 : subreq, struct tevent_req);
287 12 : struct mds_es_connect_state *state = tevent_req_data(
288 : req, struct mds_es_connect_state);
289 : int error;
290 :
291 12 : error = http_connect_recv(subreq,
292 12 : state->mds_es_ctx,
293 12 : &state->mds_es_ctx->http_conn);
294 12 : TALLOC_FREE(subreq);
295 12 : if (error != 0) {
296 10 : DBG_ERR("HTTP connect failed, retrying...\n");
297 :
298 10 : subreq = tevent_wakeup_send(
299 10 : state->mds_es_ctx,
300 10 : state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
301 : tevent_timeval_current_ofs(10, 0));
302 10 : if (tevent_req_nomem(subreq, req)) {
303 0 : return;
304 : }
305 10 : tevent_req_set_callback(subreq,
306 : mds_es_http_waited,
307 : req);
308 10 : return;
309 : }
310 :
311 2 : DBG_DEBUG("Connected to HTTP%s [%s] port [%"PRIu16"]\n",
312 : state->tls_params ? "S" : "",
313 : state->server_addr, state->server_port);
314 :
315 2 : tevent_req_done(req);
316 2 : return;
317 : }
318 :
319 0 : static void mds_es_http_waited(struct tevent_req *subreq)
320 : {
321 0 : struct tevent_req *req = tevent_req_callback_data(
322 : subreq, struct tevent_req);
323 0 : struct mds_es_connect_state *state = tevent_req_data(
324 : req, struct mds_es_connect_state);
325 : bool ok;
326 :
327 0 : ok = tevent_wakeup_recv(subreq);
328 0 : TALLOC_FREE(subreq);
329 0 : if (!ok) {
330 0 : tevent_req_error(req, ETIMEDOUT);
331 0 : return;
332 : }
333 :
334 0 : subreq = mds_es_connect_send(
335 0 : state->mds_es_ctx,
336 0 : state->mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
337 : state->mds_es_ctx);
338 0 : if (tevent_req_nomem(subreq, req)) {
339 0 : return;
340 : }
341 0 : tevent_req_set_callback(subreq, mds_es_connected, state->mds_es_ctx);
342 : }
343 :
344 2 : static int mds_es_connect_recv(struct tevent_req *req)
345 : {
346 2 : return tevent_req_simple_recv_unix(req);
347 : }
348 :
349 0 : static void mds_es_reconnect_on_error(struct sl_es_search *s)
350 : {
351 0 : struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
352 0 : struct tevent_req *subreq = NULL;
353 :
354 0 : if (s->slq != NULL) {
355 0 : s->slq->state = SLQ_STATE_ERROR;
356 : }
357 :
358 0 : DBG_WARNING("Reconnecting HTTP...\n");
359 0 : TALLOC_FREE(mds_es_ctx->http_conn);
360 :
361 0 : subreq = mds_es_connect_send(
362 : mds_es_ctx,
363 0 : mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
364 : mds_es_ctx);
365 0 : if (subreq == NULL) {
366 0 : DBG_ERR("mds_es_connect_send failed\n");
367 0 : return;
368 : }
369 0 : tevent_req_set_callback(subreq, mds_es_connected, mds_es_ctx);
370 : }
371 :
372 2 : static int search_destructor(struct sl_es_search *s)
373 : {
374 2 : if (s->mds_es_ctx == NULL) {
375 0 : return 0;
376 : }
377 2 : DLIST_REMOVE(s->mds_es_ctx->searches, s);
378 2 : return 0;
379 : }
380 :
381 : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
382 : struct tevent_context *ev,
383 : struct sl_es_search *s);
384 : static int mds_es_search_recv(struct tevent_req *req);
385 : static void mds_es_search_done(struct tevent_req *subreq);
386 :
387 2 : static bool mds_es_search(struct sl_query *slq)
388 : {
389 2 : struct mds_es_ctx *mds_es_ctx = talloc_get_type_abort(
390 : slq->mds_ctx->backend_private, struct mds_es_ctx);
391 2 : struct sl_es_search *s = NULL;
392 : bool ok;
393 :
394 2 : s = talloc_zero(slq, struct sl_es_search);
395 2 : if (s == NULL) {
396 0 : return false;
397 : }
398 2 : *s = (struct sl_es_search) {
399 2 : .ev = mds_es_ctx->mdssvc_es_ctx->mdssvc_ctx->ev_ctx,
400 : .mds_es_ctx = mds_es_ctx,
401 : .slq = slq,
402 : .size = SL_PAGESIZE,
403 : };
404 :
405 : /* 0 would mean no limit */
406 2 : s->max = lp_parm_ulonglong(s->slq->mds_ctx->snum,
407 : "elasticsearch",
408 : "max results",
409 : MAX_SL_RESULTS);
410 :
411 2 : DBG_DEBUG("Spotlight query: '%s'\n", slq->query_string);
412 :
413 2 : ok = map_spotlight_to_es_query(
414 : s,
415 2 : mds_es_ctx->mdssvc_es_ctx->mappings,
416 : slq->path_scope,
417 2 : slq->query_string,
418 : &s->es_query);
419 2 : if (!ok) {
420 0 : TALLOC_FREE(s);
421 0 : return false;
422 : }
423 2 : DBG_DEBUG("Elasticsearch query: '%s'\n", s->es_query);
424 :
425 2 : slq->backend_private = s;
426 2 : slq->state = SLQ_STATE_RUNNING;
427 2 : DLIST_ADD_END(mds_es_ctx->searches, s);
428 2 : talloc_set_destructor(s, search_destructor);
429 :
430 2 : return mds_es_next_search_trigger(mds_es_ctx);
431 : }
432 :
433 6 : static bool mds_es_next_search_trigger(struct mds_es_ctx *mds_es_ctx)
434 : {
435 6 : struct tevent_req *subreq = NULL;
436 6 : struct sl_es_search *s = mds_es_ctx->searches;
437 :
438 6 : if (mds_es_ctx->http_conn == NULL) {
439 0 : DBG_DEBUG("Waiting for HTTP connection...\n");
440 0 : return true;
441 : }
442 6 : if (s == NULL) {
443 4 : DBG_DEBUG("No pending searches, idling...\n");
444 4 : return true;
445 : }
446 2 : if (s->pending) {
447 0 : DBG_DEBUG("Search pending [%p]\n", s);
448 0 : return true;
449 : }
450 :
451 2 : subreq = mds_es_search_send(s, s->ev, s);
452 2 : if (subreq == NULL) {
453 0 : return false;
454 : }
455 2 : tevent_req_set_callback(subreq, mds_es_search_done, s);
456 2 : mds_es_search_set_pending(s);
457 2 : return true;
458 : }
459 :
460 2 : static void mds_es_search_done(struct tevent_req *subreq)
461 : {
462 2 : struct sl_es_search *s = tevent_req_callback_data(
463 : subreq, struct sl_es_search);
464 2 : struct mds_es_ctx *mds_es_ctx = s->mds_es_ctx;
465 2 : struct sl_query *slq = s->slq;
466 : int ret;
467 : bool ok;
468 :
469 2 : DBG_DEBUG("Search done for search [%p]\n", s);
470 :
471 2 : mds_es_search_unset_pending(s);
472 :
473 2 : if (mds_es_ctx == NULL) {
474 : /*
475 : * Search connection closed by the user while s was pending.
476 : */
477 0 : TALLOC_FREE(s);
478 0 : return;
479 : }
480 :
481 2 : DLIST_REMOVE(mds_es_ctx->searches, s);
482 :
483 2 : ret = mds_es_search_recv(subreq);
484 2 : TALLOC_FREE(subreq);
485 2 : if (ret != 0) {
486 0 : mds_es_reconnect_on_error(s);
487 0 : return;
488 : }
489 :
490 2 : if (slq == NULL) {
491 : /*
492 : * Closed by the user. Explicitly free "s" here because the
493 : * talloc parent slq is already gone.
494 : */
495 0 : TALLOC_FREE(s);
496 0 : goto trigger;
497 : }
498 :
499 2 : SLQ_DEBUG(10, slq, "search done");
500 :
501 2 : if (s->total == 0 || s->from >= s->max) {
502 2 : slq->state = SLQ_STATE_DONE;
503 2 : goto trigger;
504 : }
505 :
506 0 : if (slq->query_results->num_results >= SL_PAGESIZE) {
507 0 : slq->state = SLQ_STATE_FULL;
508 0 : goto trigger;
509 : }
510 :
511 : /*
512 : * Reschedule this query as there are more results waiting in the
513 : * Elasticsearch server and the client result queue has room as
514 : * well. But put it at the end of the list of active queries as a simple
515 : * heuristic that should ensure all client queries are dispatched to the
516 : * server.
517 : */
518 0 : DLIST_ADD_END(mds_es_ctx->searches, s);
519 :
520 2 : trigger:
521 2 : ok = mds_es_next_search_trigger(mds_es_ctx);
522 2 : if (!ok) {
523 0 : DBG_ERR("mds_es_next_search_trigger failed\n");
524 : }
525 : }
526 :
527 : static void mds_es_search_http_send_done(struct tevent_req *subreq);
528 : static void mds_es_search_http_read_done(struct tevent_req *subreq);
529 :
530 : struct mds_es_search_state {
531 : struct tevent_context *ev;
532 : struct sl_es_search *s;
533 : struct tevent_queue_entry *qe;
534 : struct http_request http_request;
535 : struct http_request *http_response;
536 : };
537 :
538 0 : static int mds_es_search_pending_destructor(struct sl_es_search *s)
539 : {
540 : /*
541 : * s is a child of slq which may get freed when a user closes a
542 : * query. To maintain the HTTP request/response sequence on the HTTP
543 : * channel, we keep processing pending requests and free s when we
544 : * receive the HTTP response for pending requests.
545 : */
546 0 : DBG_DEBUG("Preserving pending search [%p]\n", s);
547 0 : s->slq = NULL;
548 0 : return -1;
549 : }
550 :
551 2 : static void mds_es_search_set_pending(struct sl_es_search *s)
552 : {
553 2 : DBG_DEBUG("Set pending [%p]\n", s);
554 2 : SLQ_DEBUG(10, s->slq, "pending");
555 :
556 2 : s->pending = true;
557 2 : talloc_set_destructor(s, mds_es_search_pending_destructor);
558 2 : }
559 :
560 2 : static void mds_es_search_unset_pending(struct sl_es_search *s)
561 : {
562 2 : DBG_DEBUG("Unset pending [%p]\n", s);
563 2 : if (s->slq != NULL) {
564 2 : SLQ_DEBUG(10, s->slq, "unset pending");
565 : }
566 :
567 2 : s->pending = false;
568 2 : talloc_set_destructor(s, search_destructor);
569 2 : }
570 :
571 2 : static struct tevent_req *mds_es_search_send(TALLOC_CTX *mem_ctx,
572 : struct tevent_context *ev,
573 : struct sl_es_search *s)
574 : {
575 2 : struct tevent_req *req = NULL;
576 2 : struct tevent_req *subreq = NULL;
577 2 : struct mds_es_search_state *state = NULL;
578 2 : const char *index = NULL;
579 2 : char *elastic_query = NULL;
580 2 : char *uri = NULL;
581 : size_t elastic_query_len;
582 2 : char *elastic_query_len_str = NULL;
583 2 : char *hostname = NULL;
584 2 : bool pretty = false;
585 :
586 2 : req = tevent_req_create(mem_ctx, &state, struct mds_es_search_state);
587 2 : if (req == NULL) {
588 0 : return NULL;
589 : }
590 2 : *state = (struct mds_es_search_state) {
591 : .ev = ev,
592 : .s = s,
593 : };
594 :
595 2 : if (!tevent_req_set_endtime(req, ev, timeval_current_ofs(60, 0))) {
596 0 : return tevent_req_post(req, s->ev);
597 : }
598 :
599 2 : index = lp_parm_const_string(s->slq->mds_ctx->snum,
600 : "elasticsearch",
601 : "index",
602 : "_all");
603 2 : if (tevent_req_nomem(index, req)) {
604 0 : return tevent_req_post(req, ev);
605 : }
606 :
607 2 : if (DEBUGLVL(10)) {
608 0 : pretty = true;
609 : }
610 :
611 2 : uri = talloc_asprintf(state,
612 : "/%s/_search%s",
613 : index,
614 : pretty ? "?pretty" : "");
615 2 : if (tevent_req_nomem(uri, req)) {
616 0 : return tevent_req_post(req, ev);
617 : }
618 :
619 2 : elastic_query = talloc_asprintf(state,
620 : MDSSVC_ELASTIC_QUERY_TEMPLATE,
621 : s->from,
622 : s->size,
623 : MDSSVC_ELASTIC_SOURCES,
624 : s->es_query);
625 2 : if (tevent_req_nomem(elastic_query, req)) {
626 0 : return tevent_req_post(req, ev);
627 : }
628 2 : DBG_DEBUG("Elastic query: '%s'\n", elastic_query);
629 :
630 2 : elastic_query_len = strlen(elastic_query);
631 :
632 4 : state->http_request = (struct http_request) {
633 : .type = HTTP_REQ_POST,
634 : .uri = uri,
635 2 : .body = data_blob_const(elastic_query, elastic_query_len),
636 : .major = '1',
637 : .minor = '1',
638 : };
639 :
640 2 : elastic_query_len_str = talloc_asprintf(state, "%zu", elastic_query_len);
641 2 : if (tevent_req_nomem(elastic_query_len_str, req)) {
642 0 : return tevent_req_post(req, ev);
643 : }
644 :
645 2 : hostname = get_myname(state);
646 2 : if (tevent_req_nomem(hostname, req)) {
647 0 : return tevent_req_post(req, ev);
648 : }
649 :
650 2 : http_add_header(state, &state->http_request.headers,
651 : "Content-Type", "application/json");
652 2 : http_add_header(state, &state->http_request.headers,
653 : "Accept", "application/json");
654 2 : http_add_header(state, &state->http_request.headers,
655 : "User-Agent", "Samba/mdssvc");
656 2 : http_add_header(state, &state->http_request.headers,
657 : "Host", hostname);
658 2 : http_add_header(state, &state->http_request.headers,
659 : "Content-Length", elastic_query_len_str);
660 :
661 2 : subreq = http_send_request_send(state,
662 : ev,
663 2 : s->mds_es_ctx->http_conn,
664 2 : &state->http_request);
665 2 : if (tevent_req_nomem(subreq, req)) {
666 0 : return tevent_req_post(req, ev);
667 : }
668 2 : tevent_req_set_callback(subreq, mds_es_search_http_send_done, req);
669 2 : return req;
670 : }
671 :
672 2 : static void mds_es_search_http_send_done(struct tevent_req *subreq)
673 : {
674 2 : struct tevent_req *req = tevent_req_callback_data(
675 : subreq, struct tevent_req);
676 2 : struct mds_es_search_state *state = tevent_req_data(
677 : req, struct mds_es_search_state);
678 : NTSTATUS status;
679 :
680 2 : DBG_DEBUG("Sent out search [%p]\n", state->s);
681 :
682 2 : status = http_send_request_recv(subreq);
683 2 : TALLOC_FREE(subreq);
684 2 : if (!NT_STATUS_IS_OK(status)) {
685 0 : tevent_req_error(req, map_errno_from_nt_status(status));
686 0 : return;
687 : }
688 :
689 2 : if (state->s->mds_es_ctx == NULL || state->s->slq == NULL) {
690 0 : tevent_req_done(req);
691 0 : return;
692 : }
693 :
694 2 : subreq = http_read_response_send(state,
695 : state->ev,
696 2 : state->s->mds_es_ctx->http_conn,
697 : SL_PAGESIZE * 8192);
698 2 : if (tevent_req_nomem(subreq, req)) {
699 0 : return;
700 : }
701 2 : tevent_req_set_callback(subreq, mds_es_search_http_read_done, req);
702 : }
703 :
704 2 : static void mds_es_search_http_read_done(struct tevent_req *subreq)
705 : {
706 2 : struct tevent_req *req = tevent_req_callback_data(
707 : subreq, struct tevent_req);
708 2 : struct mds_es_search_state *state = tevent_req_data(
709 : req, struct mds_es_search_state);
710 2 : struct sl_es_search *s = state->s;
711 2 : struct sl_query *slq = s->slq;
712 2 : json_t *root = NULL;
713 2 : json_t *matches = NULL;
714 2 : json_t *match = NULL;
715 : size_t i;
716 : json_error_t error;
717 : size_t hits;
718 : NTSTATUS status;
719 : int ret;
720 : bool ok;
721 :
722 2 : DBG_DEBUG("Got response for search [%p]\n", s);
723 :
724 2 : status = http_read_response_recv(subreq, state, &state->http_response);
725 2 : TALLOC_FREE(subreq);
726 2 : if (!NT_STATUS_IS_OK(status)) {
727 0 : DBG_DEBUG("HTTP response failed: %s\n", nt_errstr(status));
728 0 : tevent_req_error(req, map_errno_from_nt_status(status));
729 0 : return;
730 : }
731 :
732 2 : if (slq == NULL || s->mds_es_ctx == NULL) {
733 0 : tevent_req_done(req);
734 0 : return;
735 : }
736 :
737 2 : switch (state->http_response->response_code) {
738 2 : case 200:
739 2 : break;
740 0 : default:
741 0 : DBG_ERR("HTTP server response: %u\n",
742 : state->http_response->response_code);
743 0 : goto fail;
744 : }
745 :
746 2 : DBG_DEBUG("JSON response:\n%s\n",
747 : talloc_strndup(talloc_tos(),
748 : (char *)state->http_response->body.data,
749 : state->http_response->body.length));
750 :
751 2 : root = json_loadb((char *)state->http_response->body.data,
752 2 : state->http_response->body.length,
753 : 0,
754 : &error);
755 2 : if (root == NULL) {
756 0 : DBG_ERR("json_loadb failed\n");
757 0 : goto fail;
758 : }
759 :
760 2 : if (s->total == 0) {
761 : /*
762 : * Get the total number of results the first time, format
763 : * used by Elasticsearch 7.0 or newer
764 : */
765 2 : ret = json_unpack(root, "{s: {s: {s: i}}}",
766 : "hits", "total", "value", &s->total);
767 2 : if (ret != 0) {
768 : /* Format used before 7.0 */
769 0 : ret = json_unpack(root, "{s: {s: i}}",
770 : "hits", "total", &s->total);
771 0 : if (ret != 0) {
772 0 : DBG_ERR("json_unpack failed\n");
773 0 : goto fail;
774 : }
775 : }
776 :
777 2 : DBG_DEBUG("Total: %zu\n", s->total);
778 :
779 2 : if (s->total == 0) {
780 0 : json_decref(root);
781 0 : tevent_req_done(req);
782 0 : return;
783 : }
784 : }
785 :
786 2 : if (s->max == 0 || s->max > s->total) {
787 2 : s->max = s->total;
788 : }
789 :
790 2 : ret = json_unpack(root, "{s: {s:o}}",
791 : "hits", "hits", &matches);
792 2 : if (ret != 0 || matches == NULL) {
793 0 : DBG_ERR("json_unpack hits failed\n");
794 0 : goto fail;
795 : }
796 :
797 2 : hits = json_array_size(matches);
798 2 : if (hits == 0) {
799 0 : DBG_ERR("Hu?! No results?\n");
800 0 : goto fail;
801 : }
802 2 : DBG_DEBUG("Hits: %zu\n", hits);
803 :
804 6 : for (i = 0; i < hits && s->from + i < s->max; i++) {
805 4 : const char *path = NULL;
806 :
807 4 : match = json_array_get(matches, i);
808 4 : if (match == NULL) {
809 0 : DBG_ERR("Hu?! No value for index %zu\n", i);
810 0 : goto fail;
811 : }
812 4 : ret = json_unpack(match,
813 : "{s: {s: {s: s}}}",
814 : "_source",
815 : "path",
816 : "real",
817 : &path);
818 4 : if (ret != 0) {
819 0 : DBG_ERR("Missing path.real in JSON result\n");
820 0 : goto fail;
821 : }
822 :
823 4 : ok = mds_add_result(slq, path);
824 4 : if (!ok) {
825 0 : DBG_ERR("error adding result for path: %s\n", path);
826 0 : goto fail;
827 : }
828 : }
829 2 : json_decref(root);
830 :
831 2 : s->from += hits;
832 2 : slq->state = SLQ_STATE_RESULTS;
833 2 : tevent_req_done(req);
834 2 : return;
835 :
836 0 : fail:
837 0 : if (root != NULL) {
838 0 : json_decref(root);
839 : }
840 0 : slq->state = SLQ_STATE_ERROR;
841 0 : tevent_req_error(req, EINVAL);
842 0 : return;
843 : }
844 :
845 2 : static int mds_es_search_recv(struct tevent_req *req)
846 : {
847 2 : return tevent_req_simple_recv_unix(req);
848 : }
849 :
850 0 : static bool mds_es_search_cont(struct sl_query *slq)
851 : {
852 0 : struct sl_es_search *s = talloc_get_type_abort(
853 : slq->backend_private, struct sl_es_search);
854 :
855 0 : SLQ_DEBUG(10, slq, "continue");
856 0 : DLIST_ADD_END(s->mds_es_ctx->searches, s);
857 0 : return mds_es_next_search_trigger(s->mds_es_ctx);
858 : }
859 :
860 : struct mdssvc_backend mdsscv_backend_es = {
861 : .init = mdssvc_es_init,
862 : .shutdown = mdssvc_es_shutdown,
863 : .connect = mds_es_connect,
864 : .search_start = mds_es_search,
865 : .search_cont = mds_es_search_cont,
866 : };
|