Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Samba internal messaging functions
4 : Copyright (C) Andrew Tridgell 2000
5 : Copyright (C) 2001 by Martin Pool
6 : Copyright (C) 2002 by Jeremy Allison
7 : Copyright (C) 2007 by Volker Lendecke
8 :
9 : This program is free software; you can redistribute it and/or modify
10 : it under the terms of the GNU General Public License as published by
11 : the Free Software Foundation; either version 3 of the License, or
12 : (at your option) any later version.
13 :
14 : This program is distributed in the hope that it will be useful,
15 : but WITHOUT ANY WARRANTY; without even the implied warranty of
16 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 : GNU General Public License for more details.
18 :
19 : You should have received a copy of the GNU General Public License
20 : along with this program. If not, see <http://www.gnu.org/licenses/>.
21 : */
22 :
23 : /**
24 : @defgroup messages Internal messaging framework
25 : @{
26 : @file messages.c
27 :
28 : @brief Module for internal messaging between Samba daemons.
29 :
30 : The idea is that if a part of Samba wants to do communication with
31 : another Samba process then it will do a message_register() of a
32 : dispatch function, and use message_send_pid() to send messages to
33 : that process.
34 :
35 : The dispatch function is given the pid of the sender, and it can
36 : use that to reply by message_send_pid(). See ping_message() for a
37 : simple example.
38 :
39 : @caution Dispatch functions must be able to cope with incoming
40 : messages on an *odd* byte boundary.
41 :
42 : This system doesn't have any inherent size limitations but is not
43 : very efficient for large messages or when messages are sent in very
44 : quick succession.
45 :
46 : */
47 :
48 : #include "includes.h"
49 : #include "lib/util/server_id.h"
50 : #include "dbwrap/dbwrap.h"
51 : #include "serverid.h"
52 : #include "messages.h"
53 : #include "lib/util/tevent_unix.h"
54 : #include "lib/background.h"
55 : #include "lib/messaging/messages_dgm.h"
56 : #include "lib/util/iov_buf.h"
57 : #include "lib/util/server_id_db.h"
58 : #include "lib/messaging/messages_dgm_ref.h"
59 : #include "lib/messages_ctdb.h"
60 : #include "lib/messages_ctdb_ref.h"
61 : #include "lib/messages_util.h"
62 : #include "cluster_support.h"
63 : #include "ctdbd_conn.h"
64 : #include "ctdb_srvids.h"
65 : #include "source3/lib/tallocmsg.h"
66 :
67 : #ifdef CLUSTER_SUPPORT
68 : #include "ctdb_protocol.h"
69 : #endif
70 :
71 : struct messaging_callback {
72 : struct messaging_callback *prev, *next;
73 : uint32_t msg_type;
74 : void (*fn)(struct messaging_context *msg, void *private_data,
75 : uint32_t msg_type,
76 : struct server_id server_id, DATA_BLOB *data);
77 : void *private_data;
78 : };
79 :
80 : struct messaging_registered_ev {
81 : struct tevent_context *ev;
82 : struct tevent_immediate *im;
83 : size_t refcount;
84 : };
85 :
86 : struct messaging_context {
87 : struct server_id id;
88 : struct tevent_context *event_ctx;
89 : struct messaging_callback *callbacks;
90 :
91 : struct messaging_rec *posted_msgs;
92 :
93 : struct messaging_registered_ev *event_contexts;
94 :
95 : struct tevent_req **new_waiters;
96 : size_t num_new_waiters;
97 :
98 : struct tevent_req **waiters;
99 : size_t num_waiters;
100 :
101 : struct server_id_db *names_db;
102 :
103 : TALLOC_CTX *per_process_talloc_ctx;
104 : };
105 :
106 : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
107 : struct messaging_rec *rec);
108 : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
109 : struct messaging_rec *rec);
110 : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
111 : struct tevent_context *ev,
112 : struct messaging_rec *rec);
113 : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
114 : struct tevent_context *ev,
115 : struct messaging_rec *rec);
116 :
117 : /****************************************************************************
118 : A useful function for testing the message system.
119 : ****************************************************************************/
120 :
121 105 : static void ping_message(struct messaging_context *msg_ctx,
122 : void *private_data,
123 : uint32_t msg_type,
124 : struct server_id src,
125 : DATA_BLOB *data)
126 : {
127 105 : struct server_id_buf idbuf;
128 :
129 210 : DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
130 : server_id_str_buf(src, &idbuf), (int)data->length,
131 : data->data ? (char *)data->data : ""));
132 :
133 105 : messaging_send(msg_ctx, src, MSG_PONG, data);
134 105 : }
135 :
136 172849 : struct messaging_rec *messaging_rec_create(
137 : TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
138 : uint32_t msg_type, const struct iovec *iov, int iovlen,
139 : const int *fds, size_t num_fds)
140 : {
141 369 : ssize_t buflen;
142 369 : uint8_t *buf;
143 369 : struct messaging_rec *result;
144 :
145 172849 : if (num_fds > INT8_MAX) {
146 0 : return NULL;
147 : }
148 :
149 172849 : buflen = iov_buflen(iov, iovlen);
150 172849 : if (buflen == -1) {
151 0 : return NULL;
152 : }
153 172849 : buf = talloc_array(mem_ctx, uint8_t, buflen);
154 172849 : if (buf == NULL) {
155 0 : return NULL;
156 : }
157 172849 : iov_buf(iov, iovlen, buf, buflen);
158 :
159 172849 : {
160 369 : struct messaging_rec rec;
161 172849 : int64_t fds64[MAX(1, num_fds)];
162 369 : size_t i;
163 :
164 172850 : for (i=0; i<num_fds; i++) {
165 1 : fds64[i] = fds[i];
166 : }
167 :
168 172849 : rec = (struct messaging_rec) {
169 : .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
170 : .src = src, .dest = dst,
171 : .buf.data = buf, .buf.length = buflen,
172 : .num_fds = num_fds, .fds = fds64,
173 : };
174 :
175 172849 : result = messaging_rec_dup(mem_ctx, &rec);
176 : }
177 :
178 172849 : TALLOC_FREE(buf);
179 :
180 172849 : return result;
181 : }
182 :
183 145039 : static bool messaging_register_event_context(struct messaging_context *ctx,
184 : struct tevent_context *ev)
185 : {
186 4016 : size_t i, num_event_contexts;
187 145039 : struct messaging_registered_ev *free_reg = NULL;
188 4016 : struct messaging_registered_ev *tmp;
189 :
190 145039 : num_event_contexts = talloc_array_length(ctx->event_contexts);
191 :
192 145072 : for (i=0; i<num_event_contexts; i++) {
193 139819 : struct messaging_registered_ev *reg = &ctx->event_contexts[i];
194 :
195 139819 : if (reg->refcount == 0) {
196 3 : if (reg->ev != NULL) {
197 0 : abort();
198 : }
199 3 : free_reg = reg;
200 : /*
201 : * We continue here and may find another
202 : * free_req, but the important thing is
203 : * that we continue to search for an
204 : * existing registration in the loop.
205 : */
206 3 : continue;
207 : }
208 :
209 139816 : if (reg->ev == ev) {
210 139786 : reg->refcount += 1;
211 139786 : return true;
212 : }
213 : }
214 :
215 5253 : if (free_reg == NULL) {
216 5250 : struct tevent_immediate *im = NULL;
217 :
218 5250 : im = tevent_create_immediate(ctx);
219 5250 : if (im == NULL) {
220 0 : return false;
221 : }
222 :
223 5250 : tmp = talloc_realloc(ctx, ctx->event_contexts,
224 : struct messaging_registered_ev,
225 : num_event_contexts+1);
226 5250 : if (tmp == NULL) {
227 0 : return false;
228 : }
229 5250 : ctx->event_contexts = tmp;
230 :
231 5250 : free_reg = &ctx->event_contexts[num_event_contexts];
232 5250 : free_reg->im = talloc_move(ctx->event_contexts, &im);
233 : }
234 :
235 : /*
236 : * free_reg->im might be cached
237 : */
238 5253 : free_reg->ev = ev;
239 5253 : free_reg->refcount = 1;
240 :
241 5253 : return true;
242 : }
243 :
244 133325 : static bool messaging_deregister_event_context(struct messaging_context *ctx,
245 : struct tevent_context *ev)
246 : {
247 3744 : size_t i, num_event_contexts;
248 :
249 133325 : num_event_contexts = talloc_array_length(ctx->event_contexts);
250 :
251 133355 : for (i=0; i<num_event_contexts; i++) {
252 133355 : struct messaging_registered_ev *reg = &ctx->event_contexts[i];
253 :
254 133355 : if (reg->refcount == 0) {
255 0 : continue;
256 : }
257 :
258 133355 : if (reg->ev == ev) {
259 133325 : reg->refcount -= 1;
260 :
261 133325 : if (reg->refcount == 0) {
262 : /*
263 : * The primary event context
264 : * is never unregistered using
265 : * messaging_deregister_event_context()
266 : * it's only registered using
267 : * messaging_register_event_context().
268 : */
269 30 : SMB_ASSERT(ev != ctx->event_ctx);
270 30 : SMB_ASSERT(reg->ev != ctx->event_ctx);
271 :
272 : /*
273 : * Not strictly necessary, just
274 : * paranoia
275 : */
276 30 : reg->ev = NULL;
277 :
278 : /*
279 : * Do not talloc_free(reg->im),
280 : * recycle immediates events.
281 : *
282 : * We just invalidate it using
283 : * the primary event context,
284 : * which is never unregistered.
285 : */
286 30 : tevent_schedule_immediate(reg->im,
287 : ctx->event_ctx,
288 3744 : NULL, NULL);
289 : }
290 133325 : return true;
291 : }
292 : }
293 0 : return false;
294 : }
295 :
296 172361 : static void messaging_post_main_event_context(struct tevent_context *ev,
297 : struct tevent_immediate *im,
298 : void *private_data)
299 : {
300 172361 : struct messaging_context *ctx = talloc_get_type_abort(
301 : private_data, struct messaging_context);
302 :
303 373426 : while (ctx->posted_msgs != NULL) {
304 172794 : struct messaging_rec *rec = ctx->posted_msgs;
305 366 : bool consumed;
306 :
307 172794 : DLIST_REMOVE(ctx->posted_msgs, rec);
308 :
309 172794 : consumed = messaging_dispatch_classic(ctx, rec);
310 172790 : if (!consumed) {
311 22044 : consumed = messaging_dispatch_waiters(
312 : ctx, ctx->event_ctx, rec);
313 : }
314 :
315 172790 : if (!consumed) {
316 : uint8_t i;
317 :
318 20599 : for (i=0; i<rec->num_fds; i++) {
319 0 : close(rec->fds[i]);
320 : }
321 : }
322 :
323 173143 : TALLOC_FREE(rec);
324 : }
325 172357 : }
326 :
327 0 : static void messaging_post_sub_event_context(struct tevent_context *ev,
328 : struct tevent_immediate *im,
329 : void *private_data)
330 : {
331 0 : struct messaging_context *ctx = talloc_get_type_abort(
332 : private_data, struct messaging_context);
333 0 : struct messaging_rec *rec, *next;
334 :
335 0 : for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
336 0 : bool consumed;
337 :
338 0 : next = rec->next;
339 :
340 0 : consumed = messaging_dispatch_waiters(ctx, ev, rec);
341 0 : if (consumed) {
342 0 : DLIST_REMOVE(ctx->posted_msgs, rec);
343 0 : TALLOC_FREE(rec);
344 : }
345 : }
346 0 : }
347 :
348 172849 : static bool messaging_alert_event_contexts(struct messaging_context *ctx)
349 : {
350 369 : size_t i, num_event_contexts;
351 :
352 172849 : num_event_contexts = talloc_array_length(ctx->event_contexts);
353 :
354 345728 : for (i=0; i<num_event_contexts; i++) {
355 172879 : struct messaging_registered_ev *reg = &ctx->event_contexts[i];
356 :
357 172879 : if (reg->refcount == 0) {
358 26 : continue;
359 : }
360 :
361 : /*
362 : * We depend on schedule_immediate to work
363 : * multiple times. Might be a bit inefficient,
364 : * but this needs to be proven in tests. The
365 : * alternatively would be to track whether the
366 : * immediate has already been scheduled. For
367 : * now, avoid that complexity here.
368 : */
369 :
370 172853 : if (reg->ev == ctx->event_ctx) {
371 172849 : tevent_schedule_immediate(
372 : reg->im, reg->ev,
373 : messaging_post_main_event_context,
374 369 : ctx);
375 : } else {
376 4 : tevent_schedule_immediate(
377 : reg->im, reg->ev,
378 : messaging_post_sub_event_context,
379 371 : ctx);
380 : }
381 :
382 : }
383 172849 : return true;
384 : }
385 :
386 85326 : static void messaging_recv_cb(struct tevent_context *ev,
387 : const uint8_t *msg, size_t msg_len,
388 : int *fds, size_t num_fds,
389 : void *private_data)
390 85326 : {
391 85326 : struct messaging_context *msg_ctx = talloc_get_type_abort(
392 : private_data, struct messaging_context);
393 344 : struct server_id_buf idbuf;
394 344 : struct messaging_rec rec;
395 85326 : int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
396 344 : size_t i;
397 :
398 85326 : if (msg_len < MESSAGE_HDR_LENGTH) {
399 0 : DBG_WARNING("message too short: %zu\n", msg_len);
400 0 : return;
401 : }
402 :
403 85326 : if (num_fds > INT8_MAX) {
404 0 : DBG_WARNING("too many fds: %zu\n", num_fds);
405 0 : return;
406 : }
407 :
408 121186 : for (i=0; i < num_fds; i++) {
409 35860 : fds64[i] = fds[i];
410 : }
411 :
412 85326 : rec = (struct messaging_rec) {
413 : .msg_version = MESSAGE_VERSION,
414 85326 : .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
415 85326 : .buf.length = msg_len - MESSAGE_HDR_LENGTH,
416 : .num_fds = num_fds,
417 : .fds = fds64,
418 : };
419 :
420 85326 : message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
421 :
422 85326 : DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
423 : (unsigned)rec.msg_type, rec.buf.length, num_fds,
424 : server_id_str_buf(rec.src, &idbuf));
425 :
426 85326 : if (server_id_same_process(&rec.src, &msg_ctx->id)) {
427 0 : DBG_DEBUG("Ignoring self-send\n");
428 0 : return;
429 : }
430 :
431 85326 : messaging_dispatch_rec(msg_ctx, ev, &rec);
432 :
433 121530 : for (i=0; i<num_fds; i++) {
434 35860 : fds[i] = fds64[i];
435 : }
436 : }
437 :
438 36529 : static int messaging_context_destructor(struct messaging_context *ctx)
439 : {
440 986 : size_t i;
441 :
442 203052 : for (i=0; i<ctx->num_new_waiters; i++) {
443 166523 : if (ctx->new_waiters[i] != NULL) {
444 34081 : tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
445 34081 : ctx->new_waiters[i] = NULL;
446 : }
447 : }
448 56570 : for (i=0; i<ctx->num_waiters; i++) {
449 20041 : if (ctx->waiters[i] != NULL) {
450 2455 : tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
451 2455 : ctx->waiters[i] = NULL;
452 : }
453 : }
454 :
455 : /*
456 : * The immediates from messaging_alert_event_contexts
457 : * reference "ctx". Don't let them outlive the
458 : * messaging_context we're destroying here.
459 : */
460 36529 : TALLOC_FREE(ctx->event_contexts);
461 :
462 36529 : return 0;
463 : }
464 :
465 37401 : static const char *private_path(const char *name)
466 : {
467 37401 : return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
468 : }
469 :
470 5223 : static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
471 : struct tevent_context *ev,
472 : struct messaging_context **pmsg_ctx)
473 : {
474 137 : TALLOC_CTX *frame;
475 137 : struct messaging_context *ctx;
476 137 : NTSTATUS status;
477 137 : int ret;
478 137 : const char *lck_path;
479 137 : const char *priv_path;
480 137 : void *ref;
481 137 : bool ok;
482 :
483 : /*
484 : * sec_init() *must* be called before any other
485 : * functions that use sec_XXX(). e.g. sec_initial_uid().
486 : */
487 :
488 5223 : sec_init();
489 :
490 5223 : lck_path = lock_path(talloc_tos(), "msg.lock");
491 5223 : if (lck_path == NULL) {
492 0 : return NT_STATUS_NO_MEMORY;
493 : }
494 :
495 5223 : ok = directory_create_or_exist_strict(lck_path,
496 : sec_initial_uid(),
497 : 0755);
498 5223 : if (!ok) {
499 0 : DBG_DEBUG("Could not create lock directory: %s\n",
500 : strerror(errno));
501 0 : return NT_STATUS_ACCESS_DENIED;
502 : }
503 :
504 5223 : priv_path = private_path("msg.sock");
505 5223 : if (priv_path == NULL) {
506 0 : return NT_STATUS_NO_MEMORY;
507 : }
508 :
509 5223 : ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
510 : 0700);
511 5223 : if (!ok) {
512 0 : DBG_DEBUG("Could not create msg directory: %s\n",
513 : strerror(errno));
514 0 : return NT_STATUS_ACCESS_DENIED;
515 : }
516 :
517 5223 : frame = talloc_stackframe();
518 5223 : if (frame == NULL) {
519 0 : return NT_STATUS_NO_MEMORY;
520 : }
521 :
522 5223 : ctx = talloc_zero(frame, struct messaging_context);
523 5223 : if (ctx == NULL) {
524 0 : status = NT_STATUS_NO_MEMORY;
525 0 : goto done;
526 : }
527 :
528 5360 : ctx->id = (struct server_id) {
529 5223 : .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
530 : };
531 :
532 5223 : ctx->event_ctx = ev;
533 :
534 5223 : ctx->per_process_talloc_ctx = talloc_new(ctx);
535 5223 : if (ctx->per_process_talloc_ctx == NULL) {
536 0 : status = NT_STATUS_NO_MEMORY;
537 0 : goto done;
538 : }
539 :
540 5223 : ok = messaging_register_event_context(ctx, ev);
541 5223 : if (!ok) {
542 0 : status = NT_STATUS_NO_MEMORY;
543 0 : goto done;
544 : }
545 :
546 5223 : ref = messaging_dgm_ref(
547 : ctx->per_process_talloc_ctx,
548 : ctx->event_ctx,
549 : &ctx->id.unique_id,
550 : priv_path,
551 : lck_path,
552 : messaging_recv_cb,
553 : ctx,
554 : &ret);
555 5223 : if (ref == NULL) {
556 5 : DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
557 5 : status = map_nt_error_from_unix(ret);
558 5 : goto done;
559 : }
560 5218 : talloc_set_destructor(ctx, messaging_context_destructor);
561 :
562 : #ifdef CLUSTER_SUPPORT
563 : if (lp_clustering()) {
564 : ref = messaging_ctdb_ref(
565 : ctx->per_process_talloc_ctx,
566 : ctx->event_ctx,
567 : lp_ctdbd_socket(),
568 : lp_ctdb_timeout(),
569 : ctx->id.unique_id,
570 : messaging_recv_cb,
571 : ctx,
572 : &ret);
573 : if (ref == NULL) {
574 : DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
575 : strerror(ret));
576 : status = map_nt_error_from_unix(ret);
577 : goto done;
578 : }
579 : }
580 : #endif
581 :
582 5218 : ctx->id.vnn = get_my_vnn();
583 :
584 5218 : ctx->names_db = server_id_db_init(ctx,
585 : ctx->id,
586 : lp_lock_directory(),
587 : 0,
588 : TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
589 5218 : if (ctx->names_db == NULL) {
590 0 : DBG_DEBUG("server_id_db_init failed\n");
591 0 : status = NT_STATUS_NO_MEMORY;
592 0 : goto done;
593 : }
594 :
595 5218 : messaging_register(ctx, NULL, MSG_PING, ping_message);
596 :
597 : /* Register some debugging related messages */
598 :
599 5218 : register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
600 5218 : register_dmalloc_msgs(ctx);
601 5218 : debug_register_msgs(ctx);
602 :
603 : {
604 132 : struct server_id_buf tmp;
605 5218 : DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
606 : }
607 :
608 5218 : *pmsg_ctx = talloc_steal(mem_ctx, ctx);
609 :
610 5218 : status = NT_STATUS_OK;
611 5223 : done:
612 5223 : TALLOC_FREE(frame);
613 :
614 5223 : return status;
615 : }
616 :
617 5223 : struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
618 : struct tevent_context *ev)
619 : {
620 5223 : struct messaging_context *ctx = NULL;
621 137 : NTSTATUS status;
622 :
623 5223 : status = messaging_init_internal(mem_ctx,
624 : ev,
625 : &ctx);
626 5223 : if (!NT_STATUS_IS_OK(status)) {
627 0 : return NULL;
628 : }
629 :
630 5218 : return ctx;
631 : }
632 :
633 5879637 : struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
634 : {
635 5879637 : return msg_ctx->id;
636 : }
637 :
638 : /*
639 : * re-init after a fork
640 : */
641 32178 : NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
642 : {
643 875 : int ret;
644 875 : char *lck_path;
645 875 : void *ref;
646 :
647 32178 : TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
648 :
649 32178 : msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
650 32178 : if (msg_ctx->per_process_talloc_ctx == NULL) {
651 0 : return NT_STATUS_NO_MEMORY;
652 : }
653 :
654 33053 : msg_ctx->id = (struct server_id) {
655 32178 : .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
656 : };
657 :
658 32178 : lck_path = lock_path(talloc_tos(), "msg.lock");
659 32178 : if (lck_path == NULL) {
660 0 : return NT_STATUS_NO_MEMORY;
661 : }
662 :
663 32178 : ref = messaging_dgm_ref(
664 : msg_ctx->per_process_talloc_ctx,
665 : msg_ctx->event_ctx,
666 : &msg_ctx->id.unique_id,
667 : private_path("msg.sock"),
668 : lck_path,
669 : messaging_recv_cb,
670 : msg_ctx,
671 : &ret);
672 :
673 32178 : if (ref == NULL) {
674 0 : DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
675 0 : return map_nt_error_from_unix(ret);
676 : }
677 :
678 32178 : if (lp_clustering()) {
679 0 : ref = messaging_ctdb_ref(
680 : msg_ctx->per_process_talloc_ctx,
681 : msg_ctx->event_ctx,
682 : lp_ctdbd_socket(),
683 : lp_ctdb_timeout(),
684 : msg_ctx->id.unique_id,
685 : messaging_recv_cb,
686 : msg_ctx,
687 : &ret);
688 0 : if (ref == NULL) {
689 0 : DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
690 : strerror(ret));
691 0 : return map_nt_error_from_unix(ret);
692 : }
693 : }
694 :
695 32178 : server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
696 32178 : register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
697 :
698 32178 : return NT_STATUS_OK;
699 : }
700 :
701 :
702 : /*
703 : * Register a dispatch function for a particular message type. Allow multiple
704 : * registrants
705 : */
706 479375 : NTSTATUS messaging_register(struct messaging_context *msg_ctx,
707 : void *private_data,
708 : uint32_t msg_type,
709 : void (*fn)(struct messaging_context *msg,
710 : void *private_data,
711 : uint32_t msg_type,
712 : struct server_id server_id,
713 : DATA_BLOB *data))
714 : {
715 12198 : struct messaging_callback *cb;
716 :
717 479375 : DEBUG(5, ("Registering messaging pointer for type %u - "
718 : "private_data=%p\n",
719 : (unsigned)msg_type, private_data));
720 :
721 : /*
722 : * Only one callback per type
723 : */
724 :
725 11114520 : for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
726 : /* we allow a second registration of the same message
727 : type if it has a different private pointer. This is
728 : needed in, for example, the internal notify code,
729 : which creates a new notify context for each tree
730 : connect, and expects to receive messages to each of
731 : them. */
732 10667121 : if (cb->msg_type == msg_type && private_data == cb->private_data) {
733 31976 : DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
734 : (unsigned)msg_type, private_data));
735 31976 : cb->fn = fn;
736 31976 : cb->private_data = private_data;
737 31976 : return NT_STATUS_OK;
738 : }
739 : }
740 :
741 447399 : if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
742 0 : return NT_STATUS_NO_MEMORY;
743 : }
744 :
745 447399 : cb->msg_type = msg_type;
746 447399 : cb->fn = fn;
747 447399 : cb->private_data = private_data;
748 :
749 447399 : DLIST_ADD(msg_ctx->callbacks, cb);
750 447399 : return NT_STATUS_OK;
751 : }
752 :
753 : /*
754 : De-register the function for a particular message type.
755 : */
756 212833 : void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
757 : void *private_data)
758 : {
759 5526 : struct messaging_callback *cb, *next;
760 :
761 5877010 : for (cb = ctx->callbacks; cb; cb = next) {
762 5664177 : next = cb->next;
763 5664177 : if ((cb->msg_type == msg_type)
764 203317 : && (cb->private_data == private_data)) {
765 203317 : DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
766 : (unsigned)msg_type, private_data));
767 203317 : DLIST_REMOVE(ctx->callbacks, cb);
768 203317 : TALLOC_FREE(cb);
769 : }
770 : }
771 212833 : }
772 :
773 : /*
774 : Send a message to a particular server
775 : */
776 224039 : NTSTATUS messaging_send(struct messaging_context *msg_ctx,
777 : struct server_id server, uint32_t msg_type,
778 : const DATA_BLOB *data)
779 : {
780 224039 : struct iovec iov = {0};
781 :
782 224039 : if (data != NULL) {
783 223505 : iov.iov_base = data->data;
784 223505 : iov.iov_len = data->length;
785 585 : };
786 :
787 224039 : return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
788 : }
789 :
790 16352 : NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
791 : struct server_id server, uint32_t msg_type,
792 : const uint8_t *buf, size_t len)
793 : {
794 16352 : DATA_BLOB blob = data_blob_const(buf, len);
795 16352 : return messaging_send(msg_ctx, server, msg_type, &blob);
796 : }
797 :
798 172849 : static int messaging_post_self(struct messaging_context *msg_ctx,
799 : struct server_id src, struct server_id dst,
800 : uint32_t msg_type,
801 : const struct iovec *iov, int iovlen,
802 : const int *fds, size_t num_fds)
803 : {
804 369 : struct messaging_rec *rec;
805 369 : bool ok;
806 :
807 172849 : rec = messaging_rec_create(
808 : msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
809 172849 : if (rec == NULL) {
810 0 : return ENOMEM;
811 : }
812 :
813 172849 : ok = messaging_alert_event_contexts(msg_ctx);
814 172849 : if (!ok) {
815 0 : TALLOC_FREE(rec);
816 0 : return ENOMEM;
817 : }
818 :
819 172849 : DLIST_ADD_END(msg_ctx->posted_msgs, rec);
820 :
821 172480 : return 0;
822 : }
823 :
824 585858 : int messaging_send_iov_from(struct messaging_context *msg_ctx,
825 : struct server_id src, struct server_id dst,
826 : uint32_t msg_type,
827 : const struct iovec *iov, int iovlen,
828 : const int *fds, size_t num_fds)
829 585858 : {
830 1686 : int ret;
831 1686 : uint8_t hdr[MESSAGE_HDR_LENGTH];
832 585858 : struct iovec iov2[iovlen+1];
833 :
834 585858 : if (server_id_is_disconnected(&dst)) {
835 0 : return EINVAL;
836 : }
837 :
838 585858 : if (num_fds > INT8_MAX) {
839 0 : return EINVAL;
840 : }
841 :
842 585858 : if (server_id_equal(&dst, &msg_ctx->id)) {
843 172579 : ret = messaging_post_self(msg_ctx, src, dst, msg_type,
844 : iov, iovlen, fds, num_fds);
845 172579 : return ret;
846 : }
847 :
848 413279 : message_hdr_put(hdr, msg_type, src, dst);
849 413279 : iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
850 413279 : memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
851 :
852 413279 : if (dst.vnn != msg_ctx->id.vnn) {
853 0 : if (num_fds > 0) {
854 0 : return ENOSYS;
855 : }
856 :
857 0 : ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
858 0 : return ret;
859 : }
860 :
861 413279 : ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
862 :
863 413279 : if (ret == EACCES) {
864 0 : become_root();
865 0 : ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
866 : fds, num_fds);
867 0 : unbecome_root();
868 : }
869 :
870 413279 : if (ret == ECONNREFUSED) {
871 : /*
872 : * Linux returns this when a socket exists in the file
873 : * system without a listening process. This is not
874 : * documented in susv4 or the linux manpages, but it's
875 : * easily testable. For the higher levels this is the
876 : * same as "destination does not exist"
877 : */
878 4909 : ret = ENOENT;
879 : }
880 :
881 411962 : return ret;
882 : }
883 :
884 585858 : NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
885 : struct server_id dst, uint32_t msg_type,
886 : const struct iovec *iov, int iovlen,
887 : const int *fds, size_t num_fds)
888 : {
889 1686 : int ret;
890 :
891 585858 : ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
892 : iov, iovlen, fds, num_fds);
893 585858 : if (ret != 0) {
894 4911 : return map_nt_error_from_unix(ret);
895 : }
896 580947 : return NT_STATUS_OK;
897 : }
898 :
899 : struct send_all_state {
900 : struct messaging_context *msg_ctx;
901 : int msg_type;
902 : const void *buf;
903 : size_t len;
904 : };
905 :
906 12822 : static int send_all_fn(pid_t pid, void *private_data)
907 : {
908 12822 : struct send_all_state *state = private_data;
909 6 : NTSTATUS status;
910 :
911 12822 : if (pid == tevent_cached_getpid()) {
912 225 : DBG_DEBUG("Skip ourselves in messaging_send_all\n");
913 225 : return 0;
914 : }
915 :
916 12597 : status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
917 12597 : state->msg_type, state->buf, state->len);
918 12597 : if (!NT_STATUS_IS_OK(status)) {
919 4909 : DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
920 : (uintmax_t)pid, nt_errstr(status));
921 : }
922 :
923 12592 : return 0;
924 : }
925 :
926 225 : void messaging_send_all(struct messaging_context *msg_ctx,
927 : int msg_type, const void *buf, size_t len)
928 : {
929 225 : struct send_all_state state = {
930 : .msg_ctx = msg_ctx, .msg_type = msg_type,
931 : .buf = buf, .len = len
932 : };
933 1 : int ret;
934 :
935 : #ifdef CLUSTER_SUPPORT
936 : if (lp_clustering()) {
937 : struct ctdbd_connection *conn = messaging_ctdb_connection();
938 : uint8_t msghdr[MESSAGE_HDR_LENGTH];
939 : struct iovec iov[] = {
940 : { .iov_base = msghdr,
941 : .iov_len = sizeof(msghdr) },
942 : { .iov_base = discard_const_p(void, buf),
943 : .iov_len = len }
944 : };
945 :
946 : message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
947 : (struct server_id) {0});
948 :
949 : ret = ctdbd_messaging_send_iov(
950 : conn, CTDB_BROADCAST_CONNECTED,
951 : CTDB_SRVID_SAMBA_PROCESS,
952 : iov, ARRAY_SIZE(iov));
953 : if (ret != 0) {
954 : DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
955 : strerror(ret));
956 : }
957 :
958 : return;
959 : }
960 : #endif
961 :
962 225 : ret = messaging_dgm_forall(send_all_fn, &state);
963 225 : if (ret != 0) {
964 0 : DBG_WARNING("messaging_dgm_forall failed: %s\n",
965 : strerror(ret));
966 : }
967 225 : }
968 :
969 177120 : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
970 : struct messaging_rec *rec)
971 : {
972 597 : struct messaging_rec *result;
973 177120 : size_t fds_size = sizeof(int64_t) * rec->num_fds;
974 597 : size_t payload_len;
975 :
976 177120 : payload_len = rec->buf.length + fds_size;
977 177120 : if (payload_len < rec->buf.length) {
978 : /* overflow */
979 0 : return NULL;
980 : }
981 :
982 177120 : result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
983 : payload_len);
984 177120 : if (result == NULL) {
985 0 : return NULL;
986 : }
987 177120 : *result = *rec;
988 :
989 : /* Doesn't fail, see talloc_pooled_object */
990 :
991 177120 : result->buf.data = talloc_memdup(result, rec->buf.data,
992 : rec->buf.length);
993 :
994 177120 : result->fds = NULL;
995 177120 : if (result->num_fds > 0) {
996 56 : size_t i;
997 :
998 1110 : result->fds = talloc_memdup(result, rec->fds, fds_size);
999 :
1000 2223 : for (i=0; i<rec->num_fds; i++) {
1001 : /*
1002 : * fd's can only exist once
1003 : */
1004 1113 : rec->fds[i] = -1;
1005 : }
1006 : }
1007 :
1008 176523 : return result;
1009 : }
1010 :
1011 : struct messaging_filtered_read_state {
1012 : struct tevent_context *ev;
1013 : struct messaging_context *msg_ctx;
1014 : struct messaging_dgm_fde *fde;
1015 : struct messaging_ctdb_fde *cluster_fde;
1016 :
1017 : bool (*filter)(struct messaging_rec *rec, void *private_data);
1018 : void *private_data;
1019 :
1020 : struct messaging_rec *rec;
1021 : };
1022 :
1023 : static void messaging_filtered_read_cleanup(struct tevent_req *req,
1024 : enum tevent_req_state req_state);
1025 :
1026 139816 : struct tevent_req *messaging_filtered_read_send(
1027 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1028 : struct messaging_context *msg_ctx,
1029 : bool (*filter)(struct messaging_rec *rec, void *private_data),
1030 : void *private_data)
1031 : {
1032 3879 : struct tevent_req *req;
1033 3879 : struct messaging_filtered_read_state *state;
1034 3879 : size_t new_waiters_len;
1035 3879 : bool ok;
1036 :
1037 139816 : req = tevent_req_create(mem_ctx, &state,
1038 : struct messaging_filtered_read_state);
1039 139816 : if (req == NULL) {
1040 0 : return NULL;
1041 : }
1042 139816 : state->ev = ev;
1043 139816 : state->msg_ctx = msg_ctx;
1044 139816 : state->filter = filter;
1045 139816 : state->private_data = private_data;
1046 :
1047 : /*
1048 : * We have to defer the callback here, as we might be called from
1049 : * within a different tevent_context than state->ev
1050 : */
1051 139816 : tevent_req_defer_callback(req, state->ev);
1052 :
1053 139816 : state->fde = messaging_dgm_register_tevent_context(state, ev);
1054 139816 : if (tevent_req_nomem(state->fde, req)) {
1055 0 : return tevent_req_post(req, ev);
1056 : }
1057 :
1058 139816 : if (lp_clustering()) {
1059 0 : state->cluster_fde =
1060 0 : messaging_ctdb_register_tevent_context(state, ev);
1061 0 : if (tevent_req_nomem(state->cluster_fde, req)) {
1062 0 : return tevent_req_post(req, ev);
1063 : }
1064 : }
1065 :
1066 : /*
1067 : * We add ourselves to the "new_waiters" array, not the "waiters"
1068 : * array. If we are called from within messaging_read_done,
1069 : * messaging_dispatch_rec will be in an active for-loop on
1070 : * "waiters". We must be careful not to mess with this array, because
1071 : * it could mean that a single event is being delivered twice.
1072 : */
1073 :
1074 139816 : new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
1075 :
1076 139816 : if (new_waiters_len == msg_ctx->num_new_waiters) {
1077 3813 : struct tevent_req **tmp;
1078 :
1079 119236 : tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
1080 : struct tevent_req *, new_waiters_len+1);
1081 119236 : if (tevent_req_nomem(tmp, req)) {
1082 0 : return tevent_req_post(req, ev);
1083 : }
1084 119236 : msg_ctx->new_waiters = tmp;
1085 : }
1086 :
1087 139816 : msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
1088 139816 : msg_ctx->num_new_waiters += 1;
1089 139816 : tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
1090 :
1091 139816 : ok = messaging_register_event_context(msg_ctx, ev);
1092 139816 : if (!ok) {
1093 0 : tevent_req_oom(req);
1094 0 : return tevent_req_post(req, ev);
1095 : }
1096 :
1097 135937 : return req;
1098 : }
1099 :
1100 133325 : static void messaging_filtered_read_cleanup(struct tevent_req *req,
1101 : enum tevent_req_state req_state)
1102 : {
1103 133325 : struct messaging_filtered_read_state *state = tevent_req_data(
1104 : req, struct messaging_filtered_read_state);
1105 133325 : struct messaging_context *msg_ctx = state->msg_ctx;
1106 3744 : size_t i;
1107 3744 : bool ok;
1108 :
1109 133325 : tevent_req_set_cleanup_fn(req, NULL);
1110 :
1111 133325 : TALLOC_FREE(state->fde);
1112 133325 : TALLOC_FREE(state->cluster_fde);
1113 :
1114 133325 : ok = messaging_deregister_event_context(msg_ctx, state->ev);
1115 133325 : if (!ok) {
1116 0 : abort();
1117 : }
1118 :
1119 : /*
1120 : * Just set the [new_]waiters entry to NULL, be careful not to mess
1121 : * with the other "waiters" array contents. We are often called from
1122 : * within "messaging_dispatch_rec", which loops over
1123 : * "waiters". Messing with the "waiters" array will mess up that
1124 : * for-loop.
1125 : */
1126 :
1127 195001 : for (i=0; i<msg_ctx->num_waiters; i++) {
1128 82530 : if (msg_ctx->waiters[i] == req) {
1129 20854 : msg_ctx->waiters[i] = NULL;
1130 20854 : return;
1131 : }
1132 : }
1133 :
1134 441385 : for (i=0; i<msg_ctx->num_new_waiters; i++) {
1135 441385 : if (msg_ctx->new_waiters[i] == req) {
1136 112471 : msg_ctx->new_waiters[i] = NULL;
1137 112471 : return;
1138 : }
1139 : }
1140 : }
1141 :
1142 4271 : static void messaging_filtered_read_done(struct tevent_req *req,
1143 : struct messaging_rec *rec)
1144 : {
1145 4271 : struct messaging_filtered_read_state *state = tevent_req_data(
1146 : req, struct messaging_filtered_read_state);
1147 :
1148 4271 : state->rec = messaging_rec_dup(state, rec);
1149 4271 : if (tevent_req_nomem(state->rec, req)) {
1150 0 : return;
1151 : }
1152 4271 : tevent_req_done(req);
1153 : }
1154 :
1155 4271 : int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1156 : struct messaging_rec **presult)
1157 : {
1158 4271 : struct messaging_filtered_read_state *state = tevent_req_data(
1159 : req, struct messaging_filtered_read_state);
1160 228 : int err;
1161 :
1162 4271 : if (tevent_req_is_unix_error(req, &err)) {
1163 0 : tevent_req_received(req);
1164 0 : return err;
1165 : }
1166 4271 : if (presult != NULL) {
1167 4271 : *presult = talloc_move(mem_ctx, &state->rec);
1168 : }
1169 4271 : tevent_req_received(req);
1170 4271 : return 0;
1171 : }
1172 :
1173 : struct messaging_read_state {
1174 : uint32_t msg_type;
1175 : struct messaging_rec *rec;
1176 : };
1177 :
1178 : static bool messaging_read_filter(struct messaging_rec *rec,
1179 : void *private_data);
1180 : static void messaging_read_done(struct tevent_req *subreq);
1181 :
1182 31388 : struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1183 : struct tevent_context *ev,
1184 : struct messaging_context *msg,
1185 : uint32_t msg_type)
1186 : {
1187 969 : struct tevent_req *req, *subreq;
1188 969 : struct messaging_read_state *state;
1189 :
1190 31388 : req = tevent_req_create(mem_ctx, &state,
1191 : struct messaging_read_state);
1192 31388 : if (req == NULL) {
1193 0 : return NULL;
1194 : }
1195 31388 : state->msg_type = msg_type;
1196 :
1197 31388 : subreq = messaging_filtered_read_send(state, ev, msg,
1198 : messaging_read_filter, state);
1199 31388 : if (tevent_req_nomem(subreq, req)) {
1200 0 : return tevent_req_post(req, ev);
1201 : }
1202 31388 : tevent_req_set_callback(subreq, messaging_read_done, req);
1203 31388 : return req;
1204 : }
1205 :
1206 25265 : static bool messaging_read_filter(struct messaging_rec *rec,
1207 : void *private_data)
1208 : {
1209 25265 : struct messaging_read_state *state = talloc_get_type_abort(
1210 : private_data, struct messaging_read_state);
1211 :
1212 25265 : if (rec->num_fds != 0) {
1213 696 : return false;
1214 : }
1215 :
1216 24567 : return rec->msg_type == state->msg_type;
1217 : }
1218 :
1219 168 : static void messaging_read_done(struct tevent_req *subreq)
1220 : {
1221 168 : struct tevent_req *req = tevent_req_callback_data(
1222 : subreq, struct tevent_req);
1223 168 : struct messaging_read_state *state = tevent_req_data(
1224 : req, struct messaging_read_state);
1225 112 : int ret;
1226 :
1227 168 : ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1228 168 : TALLOC_FREE(subreq);
1229 168 : if (tevent_req_error(req, ret)) {
1230 0 : return;
1231 : }
1232 168 : tevent_req_done(req);
1233 : }
1234 :
1235 168 : int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1236 : struct messaging_rec **presult)
1237 : {
1238 168 : struct messaging_read_state *state = tevent_req_data(
1239 : req, struct messaging_read_state);
1240 112 : int err;
1241 :
1242 168 : if (tevent_req_is_unix_error(req, &err)) {
1243 0 : return err;
1244 : }
1245 168 : if (presult != NULL) {
1246 66 : *presult = talloc_move(mem_ctx, &state->rec);
1247 : }
1248 56 : return 0;
1249 : }
1250 :
1251 64516 : static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1252 : {
1253 64516 : if (msg_ctx->num_new_waiters == 0) {
1254 59464 : return true;
1255 : }
1256 :
1257 5048 : if (talloc_array_length(msg_ctx->waiters) <
1258 5048 : (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1259 217 : struct tevent_req **tmp;
1260 3078 : tmp = talloc_realloc(
1261 : msg_ctx, msg_ctx->waiters, struct tevent_req *,
1262 : msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1263 3078 : if (tmp == NULL) {
1264 0 : DEBUG(1, ("%s: talloc failed\n", __func__));
1265 0 : return false;
1266 : }
1267 3078 : msg_ctx->waiters = tmp;
1268 : }
1269 :
1270 5048 : memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1271 5048 : sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1272 :
1273 5048 : msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1274 5048 : msg_ctx->num_new_waiters = 0;
1275 :
1276 5048 : return true;
1277 : }
1278 :
1279 257826 : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1280 : struct messaging_rec *rec)
1281 : {
1282 710 : struct messaging_callback *cb, *next;
1283 :
1284 1429003 : for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1285 5411 : size_t j;
1286 :
1287 1364781 : next = cb->next;
1288 1364781 : if (cb->msg_type != rec->msg_type) {
1289 1171177 : continue;
1290 : }
1291 :
1292 : /*
1293 : * the old style callbacks don't support fd passing
1294 : */
1295 193604 : for (j=0; j < rec->num_fds; j++) {
1296 0 : int fd = rec->fds[j];
1297 0 : close(fd);
1298 : }
1299 193604 : rec->num_fds = 0;
1300 193604 : rec->fds = NULL;
1301 :
1302 193604 : cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1303 : rec->src, &rec->buf);
1304 :
1305 193600 : return true;
1306 : }
1307 :
1308 63987 : return false;
1309 : }
1310 :
1311 64516 : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1312 : struct tevent_context *ev,
1313 : struct messaging_rec *rec)
1314 : {
1315 235 : size_t i;
1316 :
1317 64516 : if (!messaging_append_new_waiters(msg_ctx)) {
1318 0 : return false;
1319 : }
1320 :
1321 64281 : i = 0;
1322 287416 : while (i < msg_ctx->num_waiters) {
1323 889 : struct tevent_req *req;
1324 889 : struct messaging_filtered_read_state *state;
1325 :
1326 227171 : req = msg_ctx->waiters[i];
1327 227171 : if (req == NULL) {
1328 : /*
1329 : * This got cleaned up. In the meantime,
1330 : * move everything down one. We need
1331 : * to keep the order of waiters, as
1332 : * other code may depend on this.
1333 : */
1334 6282 : ARRAY_DEL_ELEMENT(
1335 227 : msg_ctx->waiters, i, msg_ctx->num_waiters);
1336 6282 : msg_ctx->num_waiters -= 1;
1337 6282 : continue;
1338 : }
1339 :
1340 220889 : state = tevent_req_data(
1341 : req, struct messaging_filtered_read_state);
1342 440636 : if ((ev == state->ev) &&
1343 219747 : state->filter(rec, state->private_data)) {
1344 4271 : messaging_filtered_read_done(req, rec);
1345 4271 : return true;
1346 : }
1347 :
1348 216618 : i += 1;
1349 : }
1350 :
1351 60238 : return false;
1352 : }
1353 :
1354 : /*
1355 : Dispatch one messaging_rec
1356 : */
1357 85326 : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1358 : struct tevent_context *ev,
1359 : struct messaging_rec *rec)
1360 : {
1361 344 : bool consumed;
1362 344 : size_t i;
1363 :
1364 85326 : if (ev == msg_ctx->event_ctx) {
1365 85032 : consumed = messaging_dispatch_classic(msg_ctx, rec);
1366 85032 : if (consumed) {
1367 42729 : return;
1368 : }
1369 : }
1370 :
1371 42472 : consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1372 42472 : if (consumed) {
1373 2613 : return;
1374 : }
1375 :
1376 39646 : if (ev != msg_ctx->event_ctx) {
1377 0 : struct iovec iov;
1378 270 : int fds[MAX(1, rec->num_fds)];
1379 0 : int ret;
1380 :
1381 : /*
1382 : * We've been listening on a nested event
1383 : * context. Messages need to be handled in the main
1384 : * event context, so post to ourselves
1385 : */
1386 :
1387 270 : iov.iov_base = rec->buf.data;
1388 270 : iov.iov_len = rec->buf.length;
1389 :
1390 270 : for (i=0; i<rec->num_fds; i++) {
1391 0 : fds[i] = rec->fds[i];
1392 : }
1393 :
1394 270 : ret = messaging_post_self(
1395 270 : msg_ctx, rec->src, rec->dest, rec->msg_type,
1396 270 : &iov, 1, fds, rec->num_fds);
1397 270 : if (ret == 0) {
1398 270 : return;
1399 : }
1400 : }
1401 : }
1402 :
1403 : static int mess_parent_dgm_cleanup(void *private_data);
1404 : static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1405 :
1406 88 : bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1407 : {
1408 0 : struct tevent_req *req;
1409 :
1410 88 : req = background_job_send(
1411 : msg, msg->event_ctx, msg, NULL, 0,
1412 88 : lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1413 : 60*15),
1414 : mess_parent_dgm_cleanup, msg);
1415 88 : if (req == NULL) {
1416 0 : DBG_WARNING("background_job_send failed\n");
1417 0 : return false;
1418 : }
1419 88 : tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1420 88 : return true;
1421 : }
1422 :
1423 0 : static int mess_parent_dgm_cleanup(void *private_data)
1424 : {
1425 0 : int ret;
1426 :
1427 0 : ret = messaging_dgm_wipe();
1428 0 : DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1429 : ret ? strerror(ret) : "ok"));
1430 0 : return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1431 : 60*15);
1432 : }
1433 :
1434 0 : static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1435 : {
1436 0 : struct messaging_context *msg = tevent_req_callback_data(
1437 : req, struct messaging_context);
1438 0 : NTSTATUS status;
1439 :
1440 0 : status = background_job_recv(req);
1441 0 : TALLOC_FREE(req);
1442 0 : DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1443 : nt_errstr(status)));
1444 :
1445 0 : req = background_job_send(
1446 : msg, msg->event_ctx, msg, NULL, 0,
1447 0 : lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1448 : 60*15),
1449 : mess_parent_dgm_cleanup, msg);
1450 0 : if (req == NULL) {
1451 0 : DEBUG(1, ("background_job_send failed\n"));
1452 0 : return;
1453 : }
1454 0 : tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1455 : }
1456 :
1457 0 : int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1458 : {
1459 0 : int ret;
1460 :
1461 0 : if (pid == 0) {
1462 0 : ret = messaging_dgm_wipe();
1463 : } else {
1464 0 : ret = messaging_dgm_cleanup(pid);
1465 : }
1466 :
1467 0 : return ret;
1468 : }
1469 :
1470 75469 : struct tevent_context *messaging_tevent_context(
1471 : struct messaging_context *msg_ctx)
1472 : {
1473 75469 : return msg_ctx->event_ctx;
1474 : }
1475 :
1476 21097 : struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1477 : {
1478 21097 : return msg_ctx->names_db;
1479 : }
1480 :
1481 : /** @} **/
|