Line data Source code
1 : /*
2 : * Unix SMB/CIFS implementation.
3 : *
4 : * Copyright (C) Volker Lendecke 2014
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "replace.h"
21 : #include <tevent.h>
22 : #include "notifyd_private.h"
23 : #include "lib/util/server_id.h"
24 : #include "lib/util/data_blob.h"
25 : #include "librpc/gen_ndr/notify.h"
26 : #include "librpc/gen_ndr/messaging.h"
27 : #include "librpc/gen_ndr/server_id.h"
28 : #include "lib/dbwrap/dbwrap.h"
29 : #include "lib/dbwrap/dbwrap_rbt.h"
30 : #include "messages.h"
31 : #include "tdb.h"
32 : #include "util_tdb.h"
33 : #include "notifyd.h"
34 : #include "lib/util/server_id_db.h"
35 : #include "lib/util/tevent_unix.h"
36 : #include "lib/util/tevent_ntstatus.h"
37 : #include "ctdbd_conn.h"
38 : #include "ctdb_srvids.h"
39 : #include "server_id_db_util.h"
40 : #include "lib/util/iov_buf.h"
41 : #include "messages_util.h"
42 :
43 : #ifdef CLUSTER_SUPPORT
44 : #include "ctdb_protocol.h"
45 : #endif
46 :
47 : struct notifyd_peer;
48 :
49 : /*
50 : * All of notifyd's state
51 : */
52 :
53 : struct notifyd_state {
54 : struct tevent_context *ev;
55 : struct messaging_context *msg_ctx;
56 : struct ctdbd_connection *ctdbd_conn;
57 :
58 : /*
59 : * Database of everything clients show interest in. Indexed by
60 : * absolute path. The database keys are not 0-terminated
61 : * to allow the critical operation, notifyd_trigger, to walk
62 : * the structure from the top without adding intermediate 0s.
63 : * The database records contain an array of
64 : *
65 : * struct notifyd_instance
66 : *
67 : * to be maintained and parsed by notifyd_parse_entry()
68 : */
69 : struct db_context *entries;
70 :
71 : /*
72 : * In the cluster case, this is the place where we store a log
73 : * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
74 : * forward them to our peer notifyd's in the cluster once a
75 : * second or when the log grows too large.
76 : */
77 :
78 : struct messaging_reclog *log;
79 :
80 : /*
81 : * Array of companion notifyd's in a cluster. Every notifyd
82 : * broadcasts its messaging_reclog to every other notifyd in
83 : * the cluster. This is done by making ctdb send a message to
84 : * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
85 : * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
86 : * had called register_with_ctdbd this srvid will receive the
87 : * broadcasts.
88 : *
89 : * Database replication happens via these broadcasts. Also,
90 : * they serve as liveness indication. If a notifyd receives a
91 : * broadcast from an unknown peer, it will create one for this
92 : * srvid. Also when we don't hear anything from a peer for a
93 : * while, we will discard it.
94 : */
95 :
96 : struct notifyd_peer **peers;
97 : size_t num_peers;
98 :
99 : sys_notify_watch_fn sys_notify_watch;
100 : struct sys_notify_context *sys_notify_ctx;
101 : };
102 :
103 : struct notifyd_peer {
104 : struct notifyd_state *state;
105 : struct server_id pid;
106 : uint64_t rec_index;
107 : struct db_context *db;
108 : time_t last_broadcast;
109 : };
110 :
111 : static void notifyd_rec_change(struct messaging_context *msg_ctx,
112 : void *private_data, uint32_t msg_type,
113 : struct server_id src, DATA_BLOB *data);
114 : static void notifyd_trigger(struct messaging_context *msg_ctx,
115 : void *private_data, uint32_t msg_type,
116 : struct server_id src, DATA_BLOB *data);
117 : static void notifyd_get_db(struct messaging_context *msg_ctx,
118 : void *private_data, uint32_t msg_type,
119 : struct server_id src, DATA_BLOB *data);
120 :
121 : #ifdef CLUSTER_SUPPORT
122 : static void notifyd_got_db(struct messaging_context *msg_ctx,
123 : void *private_data, uint32_t msg_type,
124 : struct server_id src, DATA_BLOB *data);
125 : static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
126 : struct server_id src,
127 : struct messaging_reclog *log);
128 : #endif
129 : static void notifyd_sys_callback(struct sys_notify_context *ctx,
130 : void *private_data, struct notify_event *ev,
131 : uint32_t filter);
132 :
133 : #ifdef CLUSTER_SUPPORT
134 : static struct tevent_req *notifyd_broadcast_reclog_send(
135 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
136 : struct ctdbd_connection *ctdbd_conn, struct server_id src,
137 : struct messaging_reclog *log);
138 : static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
139 :
140 : static struct tevent_req *notifyd_clean_peers_send(
141 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
142 : struct notifyd_state *notifyd);
143 : static int notifyd_clean_peers_recv(struct tevent_req *req);
144 : #endif
145 :
146 0 : static int sys_notify_watch_dummy(
147 : TALLOC_CTX *mem_ctx,
148 : struct sys_notify_context *ctx,
149 : const char *path,
150 : uint32_t *filter,
151 : uint32_t *subdir_filter,
152 : void (*callback)(struct sys_notify_context *ctx,
153 : void *private_data,
154 : struct notify_event *ev,
155 : uint32_t filter),
156 : void *private_data,
157 : void *handle_p)
158 : {
159 0 : void **handle = handle_p;
160 0 : *handle = NULL;
161 0 : return 0;
162 : }
163 :
164 : #ifdef CLUSTER_SUPPORT
165 : static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
166 : static void notifyd_clean_peers_finished(struct tevent_req *subreq);
167 : static int notifyd_snoop_broadcast(struct tevent_context *ev,
168 : uint32_t src_vnn, uint32_t dst_vnn,
169 : uint64_t dst_srvid,
170 : const uint8_t *msg, size_t msglen,
171 : void *private_data);
172 : #endif
173 :
174 0 : struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
175 : struct messaging_context *msg_ctx,
176 : struct ctdbd_connection *ctdbd_conn,
177 : sys_notify_watch_fn sys_notify_watch,
178 : struct sys_notify_context *sys_notify_ctx)
179 : {
180 0 : struct tevent_req *req;
181 : #ifdef CLUSTER_SUPPORT
182 : struct tevent_req *subreq;
183 : #endif
184 0 : struct notifyd_state *state;
185 0 : struct server_id_db *names_db;
186 0 : NTSTATUS status;
187 0 : int ret;
188 :
189 0 : req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
190 0 : if (req == NULL) {
191 0 : return NULL;
192 : }
193 0 : state->ev = ev;
194 0 : state->msg_ctx = msg_ctx;
195 0 : state->ctdbd_conn = ctdbd_conn;
196 :
197 0 : if (sys_notify_watch == NULL) {
198 0 : sys_notify_watch = sys_notify_watch_dummy;
199 : }
200 :
201 0 : state->sys_notify_watch = sys_notify_watch;
202 0 : state->sys_notify_ctx = sys_notify_ctx;
203 :
204 0 : state->entries = db_open_rbt(state);
205 0 : if (tevent_req_nomem(state->entries, req)) {
206 0 : return tevent_req_post(req, ev);
207 : }
208 :
209 0 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
210 : notifyd_rec_change);
211 0 : if (tevent_req_nterror(req, status)) {
212 0 : return tevent_req_post(req, ev);
213 : }
214 :
215 0 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
216 : notifyd_trigger);
217 0 : if (tevent_req_nterror(req, status)) {
218 0 : goto deregister_rec_change;
219 : }
220 :
221 0 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
222 : notifyd_get_db);
223 0 : if (tevent_req_nterror(req, status)) {
224 0 : goto deregister_trigger;
225 : }
226 :
227 0 : names_db = messaging_names_db(msg_ctx);
228 :
229 0 : ret = server_id_db_set_exclusive(names_db, "notify-daemon");
230 0 : if (ret != 0) {
231 0 : DBG_DEBUG("server_id_db_set_exclusive() failed: %s\n",
232 : strerror(ret));
233 0 : tevent_req_error(req, ret);
234 0 : goto deregister_get_db;
235 : }
236 :
237 0 : if (ctdbd_conn == NULL) {
238 : /*
239 : * No cluster around, skip the database replication
240 : * engine
241 : */
242 0 : return req;
243 : }
244 :
245 : #ifdef CLUSTER_SUPPORT
246 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
247 : notifyd_got_db);
248 : if (tevent_req_nterror(req, status)) {
249 : goto deregister_get_db;
250 : }
251 :
252 : state->log = talloc_zero(state, struct messaging_reclog);
253 : if (tevent_req_nomem(state->log, req)) {
254 : goto deregister_db;
255 : }
256 :
257 : subreq = notifyd_broadcast_reclog_send(
258 : state->log, ev, ctdbd_conn,
259 : messaging_server_id(msg_ctx),
260 : state->log);
261 : if (tevent_req_nomem(subreq, req)) {
262 : goto deregister_db;
263 : }
264 : tevent_req_set_callback(subreq,
265 : notifyd_broadcast_reclog_finished,
266 : req);
267 :
268 : subreq = notifyd_clean_peers_send(state, ev, state);
269 : if (tevent_req_nomem(subreq, req)) {
270 : goto deregister_db;
271 : }
272 : tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
273 : req);
274 :
275 : ret = register_with_ctdbd(ctdbd_conn,
276 : CTDB_SRVID_SAMBA_NOTIFY_PROXY,
277 : notifyd_snoop_broadcast, state);
278 : if (ret != 0) {
279 : tevent_req_error(req, ret);
280 : goto deregister_db;
281 : }
282 : #endif
283 :
284 0 : return req;
285 :
286 : #ifdef CLUSTER_SUPPORT
287 : deregister_db:
288 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
289 : #endif
290 0 : deregister_get_db:
291 0 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
292 0 : deregister_trigger:
293 0 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
294 0 : deregister_rec_change:
295 0 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
296 0 : return tevent_req_post(req, ev);
297 : }
298 :
299 : #ifdef CLUSTER_SUPPORT
300 :
301 : static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
302 : {
303 : struct tevent_req *req = tevent_req_callback_data(
304 : subreq, struct tevent_req);
305 : int ret;
306 :
307 : ret = notifyd_broadcast_reclog_recv(subreq);
308 : TALLOC_FREE(subreq);
309 : tevent_req_error(req, ret);
310 : }
311 :
312 : static void notifyd_clean_peers_finished(struct tevent_req *subreq)
313 : {
314 : struct tevent_req *req = tevent_req_callback_data(
315 : subreq, struct tevent_req);
316 : int ret;
317 :
318 : ret = notifyd_clean_peers_recv(subreq);
319 : TALLOC_FREE(subreq);
320 : tevent_req_error(req, ret);
321 : }
322 :
323 : #endif
324 :
325 0 : int notifyd_recv(struct tevent_req *req)
326 : {
327 0 : return tevent_req_simple_recv_unix(req);
328 : }
329 :
330 0 : static bool notifyd_apply_rec_change(
331 : const struct server_id *client,
332 : const char *path, size_t pathlen,
333 : const struct notify_instance *chg,
334 : struct db_context *entries,
335 : sys_notify_watch_fn sys_notify_watch,
336 : struct sys_notify_context *sys_notify_ctx,
337 : struct messaging_context *msg_ctx)
338 : {
339 0 : struct db_record *rec = NULL;
340 0 : struct notifyd_instance *instances = NULL;
341 0 : size_t num_instances;
342 0 : size_t i;
343 0 : struct notifyd_instance *instance = NULL;
344 0 : TDB_DATA value;
345 0 : NTSTATUS status;
346 0 : bool ok = false;
347 :
348 0 : if (pathlen == 0) {
349 0 : DBG_WARNING("pathlen==0\n");
350 0 : return false;
351 : }
352 0 : if (path[pathlen-1] != '\0') {
353 0 : DBG_WARNING("path not 0-terminated\n");
354 0 : return false;
355 : }
356 :
357 0 : DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", "
358 : "private_data=%p\n",
359 : path,
360 : chg->filter,
361 : chg->subdir_filter,
362 : chg->private_data);
363 :
364 0 : rec = dbwrap_fetch_locked(
365 : entries, entries,
366 : make_tdb_data((const uint8_t *)path, pathlen-1));
367 :
368 0 : if (rec == NULL) {
369 0 : DBG_WARNING("dbwrap_fetch_locked failed\n");
370 0 : goto fail;
371 : }
372 :
373 0 : num_instances = 0;
374 0 : value = dbwrap_record_get_value(rec);
375 :
376 0 : if (value.dsize != 0) {
377 0 : if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
378 : &num_instances)) {
379 0 : goto fail;
380 : }
381 : }
382 :
383 : /*
384 : * Overallocate by one instance to avoid a realloc when adding
385 : */
386 0 : instances = talloc_array(rec, struct notifyd_instance,
387 : num_instances + 1);
388 0 : if (instances == NULL) {
389 0 : DBG_WARNING("talloc failed\n");
390 0 : goto fail;
391 : }
392 :
393 0 : if (value.dsize != 0) {
394 0 : memcpy(instances, value.dptr, value.dsize);
395 : }
396 :
397 0 : for (i=0; i<num_instances; i++) {
398 0 : instance = &instances[i];
399 :
400 0 : if (server_id_equal(&instance->client, client) &&
401 0 : (instance->instance.private_data == chg->private_data)) {
402 0 : break;
403 : }
404 : }
405 :
406 0 : if (i < num_instances) {
407 0 : instance->instance = *chg;
408 : } else {
409 : /*
410 : * We've overallocated for one instance
411 : */
412 0 : instance = &instances[num_instances];
413 :
414 0 : *instance = (struct notifyd_instance) {
415 0 : .client = *client,
416 0 : .instance = *chg,
417 0 : .internal_filter = chg->filter,
418 0 : .internal_subdir_filter = chg->subdir_filter
419 : };
420 :
421 0 : num_instances += 1;
422 : }
423 :
424 0 : if ((instance->instance.filter != 0) ||
425 0 : (instance->instance.subdir_filter != 0)) {
426 0 : int ret;
427 :
428 0 : TALLOC_FREE(instance->sys_watch);
429 :
430 0 : ret = sys_notify_watch(entries, sys_notify_ctx, path,
431 : &instance->internal_filter,
432 : &instance->internal_subdir_filter,
433 : notifyd_sys_callback, msg_ctx,
434 0 : &instance->sys_watch);
435 0 : if (ret != 0) {
436 0 : DBG_WARNING("sys_notify_watch for [%s] returned %s\n",
437 : path, strerror(errno));
438 : }
439 : }
440 :
441 0 : if ((instance->instance.filter == 0) &&
442 0 : (instance->instance.subdir_filter == 0)) {
443 : /* This is a delete request */
444 0 : TALLOC_FREE(instance->sys_watch);
445 0 : *instance = instances[num_instances-1];
446 0 : num_instances -= 1;
447 : }
448 :
449 0 : DBG_DEBUG("%s has %zu instances\n", path, num_instances);
450 :
451 0 : if (num_instances == 0) {
452 0 : status = dbwrap_record_delete(rec);
453 0 : if (!NT_STATUS_IS_OK(status)) {
454 0 : DBG_WARNING("dbwrap_record_delete returned %s\n",
455 : nt_errstr(status));
456 0 : goto fail;
457 : }
458 : } else {
459 0 : value = make_tdb_data(
460 : (uint8_t *)instances,
461 : sizeof(struct notifyd_instance) * num_instances);
462 :
463 0 : status = dbwrap_record_store(rec, value, 0);
464 0 : if (!NT_STATUS_IS_OK(status)) {
465 0 : DBG_WARNING("dbwrap_record_store returned %s\n",
466 : nt_errstr(status));
467 0 : goto fail;
468 : }
469 : }
470 :
471 0 : ok = true;
472 0 : fail:
473 0 : TALLOC_FREE(rec);
474 0 : return ok;
475 : }
476 :
477 0 : static void notifyd_sys_callback(struct sys_notify_context *ctx,
478 : void *private_data, struct notify_event *ev,
479 : uint32_t filter)
480 : {
481 0 : struct messaging_context *msg_ctx = talloc_get_type_abort(
482 : private_data, struct messaging_context);
483 0 : struct notify_trigger_msg msg;
484 0 : struct iovec iov[4];
485 0 : char slash = '/';
486 :
487 0 : msg = (struct notify_trigger_msg) {
488 0 : .when = timespec_current(),
489 0 : .action = ev->action,
490 : .filter = filter,
491 : };
492 :
493 0 : iov[0].iov_base = &msg;
494 0 : iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
495 0 : iov[1].iov_base = discard_const_p(char, ev->dir);
496 0 : iov[1].iov_len = strlen(ev->dir);
497 0 : iov[2].iov_base = &slash;
498 0 : iov[2].iov_len = 1;
499 0 : iov[3].iov_base = discard_const_p(char, ev->path);
500 0 : iov[3].iov_len = strlen(ev->path)+1;
501 :
502 0 : messaging_send_iov(
503 : msg_ctx, messaging_server_id(msg_ctx),
504 : MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
505 0 : }
506 :
507 0 : static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
508 : struct notify_rec_change_msg **pmsg,
509 : size_t *pathlen)
510 : {
511 0 : struct notify_rec_change_msg *msg;
512 :
513 0 : if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
514 0 : DBG_WARNING("message too short, ignoring: %zu\n", bufsize);
515 0 : return false;
516 : }
517 :
518 0 : *pmsg = msg = (struct notify_rec_change_msg *)buf;
519 0 : *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
520 :
521 0 : DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", "
522 : "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n",
523 : msg->instance.filter,
524 : msg->instance.subdir_filter,
525 : msg->instance.private_data,
526 : (int)(*pathlen),
527 : msg->path);
528 :
529 0 : return true;
530 : }
531 :
532 0 : static void notifyd_rec_change(struct messaging_context *msg_ctx,
533 : void *private_data, uint32_t msg_type,
534 : struct server_id src, DATA_BLOB *data)
535 : {
536 0 : struct notifyd_state *state = talloc_get_type_abort(
537 : private_data, struct notifyd_state);
538 0 : struct server_id_buf idbuf;
539 0 : struct notify_rec_change_msg *msg;
540 0 : size_t pathlen;
541 0 : bool ok;
542 0 : struct notify_instance instance;
543 :
544 0 : DBG_DEBUG("Got %zu bytes from %s\n", data->length,
545 : server_id_str_buf(src, &idbuf));
546 :
547 0 : ok = notifyd_parse_rec_change(data->data, data->length,
548 : &msg, &pathlen);
549 0 : if (!ok) {
550 0 : return;
551 : }
552 :
553 0 : memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */
554 :
555 0 : ok = notifyd_apply_rec_change(
556 0 : &src, msg->path, pathlen, &instance,
557 : state->entries, state->sys_notify_watch, state->sys_notify_ctx,
558 : state->msg_ctx);
559 0 : if (!ok) {
560 0 : DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n");
561 0 : return;
562 : }
563 :
564 0 : if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
565 0 : return;
566 : }
567 :
568 : #ifdef CLUSTER_SUPPORT
569 : {
570 :
571 : struct messaging_rec **tmp;
572 : struct messaging_reclog *log;
573 : struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
574 :
575 : log = state->log;
576 :
577 : tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
578 : log->num_recs+1);
579 : if (tmp == NULL) {
580 : DBG_WARNING("talloc_realloc failed, ignoring\n");
581 : return;
582 : }
583 : log->recs = tmp;
584 :
585 : log->recs[log->num_recs] = messaging_rec_create(
586 : log->recs, src, messaging_server_id(msg_ctx),
587 : msg_type, &iov, 1, NULL, 0);
588 :
589 : if (log->recs[log->num_recs] == NULL) {
590 : DBG_WARNING("messaging_rec_create failed, ignoring\n");
591 : return;
592 : }
593 :
594 : log->num_recs += 1;
595 :
596 : if (log->num_recs >= 100) {
597 : /*
598 : * Don't let the log grow too large
599 : */
600 : notifyd_broadcast_reclog(state->ctdbd_conn,
601 : messaging_server_id(msg_ctx), log);
602 : }
603 :
604 : }
605 : #endif
606 : }
607 :
608 : struct notifyd_trigger_state {
609 : struct messaging_context *msg_ctx;
610 : struct notify_trigger_msg *msg;
611 : bool recursive;
612 : bool covered_by_sys_notify;
613 : };
614 :
615 : static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
616 : void *private_data);
617 :
618 0 : static void notifyd_trigger(struct messaging_context *msg_ctx,
619 : void *private_data, uint32_t msg_type,
620 : struct server_id src, DATA_BLOB *data)
621 : {
622 0 : struct notifyd_state *state = talloc_get_type_abort(
623 : private_data, struct notifyd_state);
624 0 : struct server_id my_id = messaging_server_id(msg_ctx);
625 0 : struct notifyd_trigger_state tstate;
626 0 : const char *path;
627 0 : const char *p, *next_p;
628 :
629 0 : if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
630 0 : DBG_WARNING("message too short, ignoring: %zu\n",
631 : data->length);
632 0 : return;
633 : }
634 0 : if (data->data[data->length-1] != 0) {
635 0 : DBG_WARNING("path not 0-terminated, ignoring\n");;
636 0 : return;
637 : }
638 :
639 0 : tstate.msg_ctx = msg_ctx;
640 :
641 0 : tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
642 0 : tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
643 :
644 0 : tstate.msg = (struct notify_trigger_msg *)data->data;
645 0 : path = tstate.msg->path;
646 :
647 0 : DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", "
648 : "path=%s\n",
649 : tstate.msg->action,
650 : tstate.msg->filter,
651 : path);
652 :
653 0 : if (path[0] != '/') {
654 0 : DBG_WARNING("path %s does not start with /, ignoring\n",
655 : path);
656 0 : return;
657 : }
658 :
659 0 : for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
660 0 : ptrdiff_t path_len = p - path;
661 0 : TDB_DATA key;
662 0 : uint32_t i;
663 :
664 0 : next_p = strchr(p+1, '/');
665 0 : tstate.recursive = (next_p != NULL);
666 :
667 0 : DBG_DEBUG("Trying path %.*s\n", (int)path_len, path);
668 :
669 0 : key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
670 : .dsize = path_len };
671 :
672 0 : dbwrap_parse_record(state->entries, key,
673 : notifyd_trigger_parser, &tstate);
674 :
675 0 : if (state->peers == NULL) {
676 0 : continue;
677 : }
678 :
679 0 : if (src.vnn != my_id.vnn) {
680 0 : continue;
681 : }
682 :
683 0 : for (i=0; i<state->num_peers; i++) {
684 0 : if (state->peers[i]->db == NULL) {
685 : /*
686 : * Inactive peer, did not get a db yet
687 : */
688 0 : continue;
689 : }
690 0 : dbwrap_parse_record(state->peers[i]->db, key,
691 : notifyd_trigger_parser, &tstate);
692 : }
693 : }
694 : }
695 :
696 : static void notifyd_send_delete(struct messaging_context *msg_ctx,
697 : TDB_DATA key,
698 : struct notifyd_instance *instance);
699 :
700 0 : static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
701 : void *private_data)
702 :
703 : {
704 0 : struct notifyd_trigger_state *tstate = private_data;
705 0 : struct notify_event_msg msg = { .action = tstate->msg->action,
706 0 : .when = tstate->msg->when };
707 0 : struct iovec iov[2];
708 0 : size_t path_len = key.dsize;
709 0 : struct notifyd_instance *instances = NULL;
710 0 : size_t num_instances = 0;
711 0 : size_t i;
712 :
713 0 : if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
714 : &num_instances)) {
715 0 : DBG_DEBUG("Could not parse notifyd_entry\n");
716 0 : return;
717 : }
718 :
719 0 : DBG_DEBUG("Found %zu instances for %.*s\n",
720 : num_instances,
721 : (int)key.dsize,
722 : (char *)key.dptr);
723 :
724 0 : iov[0].iov_base = &msg;
725 0 : iov[0].iov_len = offsetof(struct notify_event_msg, path);
726 0 : iov[1].iov_base = tstate->msg->path + path_len + 1;
727 0 : iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
728 :
729 0 : for (i=0; i<num_instances; i++) {
730 0 : struct notifyd_instance *instance = &instances[i];
731 0 : struct server_id_buf idbuf;
732 0 : uint32_t i_filter;
733 0 : NTSTATUS status;
734 :
735 0 : if (tstate->covered_by_sys_notify) {
736 0 : if (tstate->recursive) {
737 0 : i_filter = instance->internal_subdir_filter;
738 : } else {
739 0 : i_filter = instance->internal_filter;
740 : }
741 : } else {
742 0 : if (tstate->recursive) {
743 0 : i_filter = instance->instance.subdir_filter;
744 : } else {
745 0 : i_filter = instance->instance.filter;
746 : }
747 : }
748 :
749 0 : if ((i_filter & tstate->msg->filter) == 0) {
750 0 : continue;
751 : }
752 :
753 0 : msg.private_data = instance->instance.private_data;
754 :
755 0 : status = messaging_send_iov(
756 : tstate->msg_ctx, instance->client,
757 : MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
758 :
759 0 : DBG_DEBUG("messaging_send_iov to %s returned %s\n",
760 : server_id_str_buf(instance->client, &idbuf),
761 : nt_errstr(status));
762 :
763 0 : if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
764 0 : procid_is_local(&instance->client)) {
765 : /*
766 : * That process has died
767 : */
768 0 : notifyd_send_delete(tstate->msg_ctx, key, instance);
769 0 : continue;
770 : }
771 :
772 0 : if (!NT_STATUS_IS_OK(status)) {
773 0 : DBG_WARNING("messaging_send_iov returned %s\n",
774 : nt_errstr(status));
775 : }
776 : }
777 : }
778 :
779 : /*
780 : * Send a delete request to ourselves to properly discard a notify
781 : * record for an smbd that has died.
782 : */
783 :
784 0 : static void notifyd_send_delete(struct messaging_context *msg_ctx,
785 : TDB_DATA key,
786 : struct notifyd_instance *instance)
787 : {
788 0 : struct notify_rec_change_msg msg = {
789 0 : .instance.private_data = instance->instance.private_data
790 : };
791 0 : uint8_t nul = 0;
792 0 : struct iovec iov[3];
793 0 : NTSTATUS status;
794 :
795 : /*
796 : * Send a rec_change to ourselves to delete a dead entry
797 : */
798 :
799 0 : iov[0] = (struct iovec) {
800 : .iov_base = &msg,
801 : .iov_len = offsetof(struct notify_rec_change_msg, path) };
802 0 : iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
803 0 : iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
804 :
805 0 : status = messaging_send_iov(msg_ctx,
806 : instance->client,
807 : MSG_SMB_NOTIFY_REC_CHANGE,
808 : iov,
809 : ARRAY_SIZE(iov),
810 : NULL,
811 : 0);
812 :
813 0 : if (!NT_STATUS_IS_OK(status)) {
814 0 : DBG_WARNING("messaging_send_iov failed: %s\n",
815 : nt_errstr(status));
816 : }
817 0 : }
818 :
819 0 : static void notifyd_get_db(struct messaging_context *msg_ctx,
820 : void *private_data, uint32_t msg_type,
821 : struct server_id src, DATA_BLOB *data)
822 : {
823 0 : struct notifyd_state *state = talloc_get_type_abort(
824 : private_data, struct notifyd_state);
825 0 : struct server_id_buf id1, id2;
826 0 : NTSTATUS status;
827 0 : uint64_t rec_index = UINT64_MAX;
828 0 : uint8_t index_buf[sizeof(uint64_t)];
829 0 : size_t dbsize;
830 0 : uint8_t *buf;
831 0 : struct iovec iov[2];
832 :
833 0 : dbsize = dbwrap_marshall(state->entries, NULL, 0);
834 :
835 0 : buf = talloc_array(talloc_tos(), uint8_t, dbsize);
836 0 : if (buf == NULL) {
837 0 : DBG_WARNING("talloc_array(%zu) failed\n", dbsize);
838 0 : return;
839 : }
840 :
841 0 : dbsize = dbwrap_marshall(state->entries, buf, dbsize);
842 :
843 0 : if (dbsize != talloc_get_size(buf)) {
844 0 : DBG_DEBUG("dbsize changed: %zu->%zu\n",
845 : talloc_get_size(buf),
846 : dbsize);
847 0 : TALLOC_FREE(buf);
848 0 : return;
849 : }
850 :
851 0 : if (state->log != NULL) {
852 0 : rec_index = state->log->rec_index;
853 : }
854 0 : SBVAL(index_buf, 0, rec_index);
855 :
856 0 : iov[0] = (struct iovec) { .iov_base = index_buf,
857 : .iov_len = sizeof(index_buf) };
858 0 : iov[1] = (struct iovec) { .iov_base = buf,
859 : .iov_len = dbsize };
860 :
861 0 : DBG_DEBUG("Sending %zu bytes to %s->%s\n",
862 : iov_buflen(iov, ARRAY_SIZE(iov)),
863 : server_id_str_buf(messaging_server_id(msg_ctx), &id1),
864 : server_id_str_buf(src, &id2));
865 :
866 0 : status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
867 : iov, ARRAY_SIZE(iov), NULL, 0);
868 0 : TALLOC_FREE(buf);
869 0 : if (!NT_STATUS_IS_OK(status)) {
870 0 : DBG_WARNING("messaging_send_iov failed: %s\n",
871 : nt_errstr(status));
872 : }
873 : }
874 :
875 : #ifdef CLUSTER_SUPPORT
876 :
877 : static int notifyd_add_proxy_syswatches(struct db_record *rec,
878 : void *private_data);
879 :
880 : static void notifyd_got_db(struct messaging_context *msg_ctx,
881 : void *private_data, uint32_t msg_type,
882 : struct server_id src, DATA_BLOB *data)
883 : {
884 : struct notifyd_state *state = talloc_get_type_abort(
885 : private_data, struct notifyd_state);
886 : struct notifyd_peer *p = NULL;
887 : struct server_id_buf idbuf;
888 : NTSTATUS status;
889 : int count;
890 : size_t i;
891 :
892 : for (i=0; i<state->num_peers; i++) {
893 : if (server_id_equal(&src, &state->peers[i]->pid)) {
894 : p = state->peers[i];
895 : break;
896 : }
897 : }
898 :
899 : if (p == NULL) {
900 : DBG_DEBUG("Did not find peer for db from %s\n",
901 : server_id_str_buf(src, &idbuf));
902 : return;
903 : }
904 :
905 : if (data->length < 8) {
906 : DBG_DEBUG("Got short db length %zu from %s\n", data->length,
907 : server_id_str_buf(src, &idbuf));
908 : TALLOC_FREE(p);
909 : return;
910 : }
911 :
912 : p->rec_index = BVAL(data->data, 0);
913 :
914 : p->db = db_open_rbt(p);
915 : if (p->db == NULL) {
916 : DBG_DEBUG("db_open_rbt failed\n");
917 : TALLOC_FREE(p);
918 : return;
919 : }
920 :
921 : status = dbwrap_unmarshall(p->db, data->data + 8,
922 : data->length - 8);
923 : if (!NT_STATUS_IS_OK(status)) {
924 : DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n",
925 : nt_errstr(status),
926 : server_id_str_buf(src, &idbuf));
927 : TALLOC_FREE(p);
928 : return;
929 : }
930 :
931 : dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
932 : &count);
933 :
934 : DBG_DEBUG("Database from %s contained %d records\n",
935 : server_id_str_buf(src, &idbuf),
936 : count);
937 : }
938 :
939 : static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
940 : struct server_id src,
941 : struct messaging_reclog *log)
942 : {
943 : enum ndr_err_code ndr_err;
944 : uint8_t msghdr[MESSAGE_HDR_LENGTH];
945 : DATA_BLOB blob;
946 : struct iovec iov[2];
947 : int ret;
948 :
949 : if (log == NULL) {
950 : return;
951 : }
952 :
953 : DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n",
954 : log->rec_index,
955 : log->num_recs);
956 :
957 : message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
958 : (struct server_id) {0 });
959 : iov[0] = (struct iovec) { .iov_base = msghdr,
960 : .iov_len = sizeof(msghdr) };
961 :
962 : ndr_err = ndr_push_struct_blob(
963 : &blob, log, log,
964 : (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
965 : if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
966 : DBG_WARNING("ndr_push_messaging_recs failed: %s\n",
967 : ndr_errstr(ndr_err));
968 : goto done;
969 : }
970 : iov[1] = (struct iovec) { .iov_base = blob.data,
971 : .iov_len = blob.length };
972 :
973 : ret = ctdbd_messaging_send_iov(
974 : ctdbd_conn, CTDB_BROADCAST_CONNECTED,
975 : CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
976 : TALLOC_FREE(blob.data);
977 : if (ret != 0) {
978 : DBG_WARNING("ctdbd_messaging_send failed: %s\n",
979 : strerror(ret));
980 : goto done;
981 : }
982 :
983 : log->rec_index += 1;
984 :
985 : done:
986 : log->num_recs = 0;
987 : TALLOC_FREE(log->recs);
988 : }
989 :
990 : struct notifyd_broadcast_reclog_state {
991 : struct tevent_context *ev;
992 : struct ctdbd_connection *ctdbd_conn;
993 : struct server_id src;
994 : struct messaging_reclog *log;
995 : };
996 :
997 : static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
998 :
999 : static struct tevent_req *notifyd_broadcast_reclog_send(
1000 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1001 : struct ctdbd_connection *ctdbd_conn, struct server_id src,
1002 : struct messaging_reclog *log)
1003 : {
1004 : struct tevent_req *req, *subreq;
1005 : struct notifyd_broadcast_reclog_state *state;
1006 :
1007 : req = tevent_req_create(mem_ctx, &state,
1008 : struct notifyd_broadcast_reclog_state);
1009 : if (req == NULL) {
1010 : return NULL;
1011 : }
1012 : state->ev = ev;
1013 : state->ctdbd_conn = ctdbd_conn;
1014 : state->src = src;
1015 : state->log = log;
1016 :
1017 : subreq = tevent_wakeup_send(state, state->ev,
1018 : timeval_current_ofs_msec(1000));
1019 : if (tevent_req_nomem(subreq, req)) {
1020 : return tevent_req_post(req, ev);
1021 : }
1022 : tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1023 : return req;
1024 : }
1025 :
1026 : static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1027 : {
1028 : struct tevent_req *req = tevent_req_callback_data(
1029 : subreq, struct tevent_req);
1030 : struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1031 : req, struct notifyd_broadcast_reclog_state);
1032 : bool ok;
1033 :
1034 : ok = tevent_wakeup_recv(subreq);
1035 : TALLOC_FREE(subreq);
1036 : if (!ok) {
1037 : tevent_req_oom(req);
1038 : return;
1039 : }
1040 :
1041 : notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1042 :
1043 : subreq = tevent_wakeup_send(state, state->ev,
1044 : timeval_current_ofs_msec(1000));
1045 : if (tevent_req_nomem(subreq, req)) {
1046 : return;
1047 : }
1048 : tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1049 : }
1050 :
1051 : static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1052 : {
1053 : return tevent_req_simple_recv_unix(req);
1054 : }
1055 :
1056 : struct notifyd_clean_peers_state {
1057 : struct tevent_context *ev;
1058 : struct notifyd_state *notifyd;
1059 : };
1060 :
1061 : static void notifyd_clean_peers_next(struct tevent_req *subreq);
1062 :
1063 : static struct tevent_req *notifyd_clean_peers_send(
1064 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1065 : struct notifyd_state *notifyd)
1066 : {
1067 : struct tevent_req *req, *subreq;
1068 : struct notifyd_clean_peers_state *state;
1069 :
1070 : req = tevent_req_create(mem_ctx, &state,
1071 : struct notifyd_clean_peers_state);
1072 : if (req == NULL) {
1073 : return NULL;
1074 : }
1075 : state->ev = ev;
1076 : state->notifyd = notifyd;
1077 :
1078 : subreq = tevent_wakeup_send(state, state->ev,
1079 : timeval_current_ofs_msec(30000));
1080 : if (tevent_req_nomem(subreq, req)) {
1081 : return tevent_req_post(req, ev);
1082 : }
1083 : tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1084 : return req;
1085 : }
1086 :
1087 : static void notifyd_clean_peers_next(struct tevent_req *subreq)
1088 : {
1089 : struct tevent_req *req = tevent_req_callback_data(
1090 : subreq, struct tevent_req);
1091 : struct notifyd_clean_peers_state *state = tevent_req_data(
1092 : req, struct notifyd_clean_peers_state);
1093 : struct notifyd_state *notifyd = state->notifyd;
1094 : size_t i;
1095 : bool ok;
1096 : time_t now = time(NULL);
1097 :
1098 : ok = tevent_wakeup_recv(subreq);
1099 : TALLOC_FREE(subreq);
1100 : if (!ok) {
1101 : tevent_req_oom(req);
1102 : return;
1103 : }
1104 :
1105 : i = 0;
1106 : while (i < notifyd->num_peers) {
1107 : struct notifyd_peer *p = notifyd->peers[i];
1108 :
1109 : if ((now - p->last_broadcast) > 60) {
1110 : struct server_id_buf idbuf;
1111 :
1112 : /*
1113 : * Haven't heard for more than 60 seconds. Call this
1114 : * peer dead
1115 : */
1116 :
1117 : DBG_DEBUG("peer %s died\n",
1118 : server_id_str_buf(p->pid, &idbuf));
1119 : /*
1120 : * This implicitly decrements notifyd->num_peers
1121 : */
1122 : TALLOC_FREE(p);
1123 : } else {
1124 : i += 1;
1125 : }
1126 : }
1127 :
1128 : subreq = tevent_wakeup_send(state, state->ev,
1129 : timeval_current_ofs_msec(30000));
1130 : if (tevent_req_nomem(subreq, req)) {
1131 : return;
1132 : }
1133 : tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1134 : }
1135 :
1136 : static int notifyd_clean_peers_recv(struct tevent_req *req)
1137 : {
1138 : return tevent_req_simple_recv_unix(req);
1139 : }
1140 :
1141 : static int notifyd_add_proxy_syswatches(struct db_record *rec,
1142 : void *private_data)
1143 : {
1144 : struct notifyd_state *state = talloc_get_type_abort(
1145 : private_data, struct notifyd_state);
1146 : struct db_context *db = dbwrap_record_get_db(rec);
1147 : TDB_DATA key = dbwrap_record_get_key(rec);
1148 : TDB_DATA value = dbwrap_record_get_value(rec);
1149 : struct notifyd_instance *instances = NULL;
1150 : size_t num_instances = 0;
1151 : size_t i;
1152 : char path[key.dsize+1];
1153 : bool ok;
1154 :
1155 : memcpy(path, key.dptr, key.dsize);
1156 : path[key.dsize] = '\0';
1157 :
1158 : ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1159 : &num_instances);
1160 : if (!ok) {
1161 : DBG_WARNING("Could not parse notifyd entry for %s\n", path);
1162 : return 0;
1163 : }
1164 :
1165 : for (i=0; i<num_instances; i++) {
1166 : struct notifyd_instance *instance = &instances[i];
1167 : uint32_t filter = instance->instance.filter;
1168 : uint32_t subdir_filter = instance->instance.subdir_filter;
1169 : int ret;
1170 :
1171 : /*
1172 : * This is a remote database. Pointers that we were
1173 : * given don't make sense locally. Initialize to NULL
1174 : * in case sys_notify_watch fails.
1175 : */
1176 : instances[i].sys_watch = NULL;
1177 :
1178 : ret = state->sys_notify_watch(
1179 : db, state->sys_notify_ctx, path,
1180 : &filter, &subdir_filter,
1181 : notifyd_sys_callback, state->msg_ctx,
1182 : &instance->sys_watch);
1183 : if (ret != 0) {
1184 : DBG_WARNING("inotify_watch returned %s\n",
1185 : strerror(errno));
1186 : }
1187 : }
1188 :
1189 : return 0;
1190 : }
1191 :
1192 : static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1193 : {
1194 : TDB_DATA key = dbwrap_record_get_key(rec);
1195 : TDB_DATA value = dbwrap_record_get_value(rec);
1196 : struct notifyd_instance *instances = NULL;
1197 : size_t num_instances = 0;
1198 : size_t i;
1199 : bool ok;
1200 :
1201 : ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1202 : &num_instances);
1203 : if (!ok) {
1204 : DBG_WARNING("Could not parse notifyd entry for %.*s\n",
1205 : (int)key.dsize, (char *)key.dptr);
1206 : return 0;
1207 : }
1208 : for (i=0; i<num_instances; i++) {
1209 : TALLOC_FREE(instances[i].sys_watch);
1210 : }
1211 : return 0;
1212 : }
1213 :
1214 : static int notifyd_peer_destructor(struct notifyd_peer *p)
1215 : {
1216 : struct notifyd_state *state = p->state;
1217 : size_t i;
1218 :
1219 : if (p->db != NULL) {
1220 : dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1221 : NULL, NULL);
1222 : }
1223 :
1224 : for (i = 0; i<state->num_peers; i++) {
1225 : if (p == state->peers[i]) {
1226 : state->peers[i] = state->peers[state->num_peers-1];
1227 : state->num_peers -= 1;
1228 : break;
1229 : }
1230 : }
1231 : return 0;
1232 : }
1233 :
1234 : static struct notifyd_peer *notifyd_peer_new(
1235 : struct notifyd_state *state, struct server_id pid)
1236 : {
1237 : struct notifyd_peer *p, **tmp;
1238 :
1239 : tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1240 : state->num_peers+1);
1241 : if (tmp == NULL) {
1242 : return NULL;
1243 : }
1244 : state->peers = tmp;
1245 :
1246 : p = talloc_zero(state->peers, struct notifyd_peer);
1247 : if (p == NULL) {
1248 : return NULL;
1249 : }
1250 : p->state = state;
1251 : p->pid = pid;
1252 :
1253 : state->peers[state->num_peers] = p;
1254 : state->num_peers += 1;
1255 :
1256 : talloc_set_destructor(p, notifyd_peer_destructor);
1257 :
1258 : return p;
1259 : }
1260 :
1261 : static void notifyd_apply_reclog(struct notifyd_peer *peer,
1262 : const uint8_t *msg, size_t msglen)
1263 : {
1264 : struct notifyd_state *state = peer->state;
1265 : DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1266 : .length = msglen };
1267 : struct server_id_buf idbuf;
1268 : struct messaging_reclog *log;
1269 : enum ndr_err_code ndr_err;
1270 : uint32_t i;
1271 :
1272 : if (peer->db == NULL) {
1273 : /*
1274 : * No db yet
1275 : */
1276 : return;
1277 : }
1278 :
1279 : log = talloc(peer, struct messaging_reclog);
1280 : if (log == NULL) {
1281 : DBG_DEBUG("talloc failed\n");
1282 : return;
1283 : }
1284 :
1285 : ndr_err = ndr_pull_struct_blob_all(
1286 : &blob, log, log,
1287 : (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1288 : if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1289 : DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n",
1290 : ndr_errstr(ndr_err));
1291 : goto fail;
1292 : }
1293 :
1294 : DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n",
1295 : log->num_recs,
1296 : log->rec_index,
1297 : server_id_str_buf(peer->pid, &idbuf));
1298 :
1299 : if (log->rec_index != peer->rec_index) {
1300 : DBG_INFO("Got rec index %"PRIu64" from %s, "
1301 : "expected %"PRIu64"\n",
1302 : log->rec_index,
1303 : server_id_str_buf(peer->pid, &idbuf),
1304 : peer->rec_index);
1305 : goto fail;
1306 : }
1307 :
1308 : for (i=0; i<log->num_recs; i++) {
1309 : struct messaging_rec *r = log->recs[i];
1310 : struct notify_rec_change_msg *chg;
1311 : size_t pathlen;
1312 : bool ok;
1313 : struct notify_instance instance;
1314 :
1315 : ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1316 : &chg, &pathlen);
1317 : if (!ok) {
1318 : DBG_INFO("notifyd_parse_rec_change failed\n");
1319 : goto fail;
1320 : }
1321 :
1322 : /* avoid SIGBUS */
1323 : memcpy(&instance, &chg->instance, sizeof(instance));
1324 :
1325 : ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1326 : &instance, peer->db,
1327 : state->sys_notify_watch,
1328 : state->sys_notify_ctx,
1329 : state->msg_ctx);
1330 : if (!ok) {
1331 : DBG_INFO("notifyd_apply_rec_change failed\n");
1332 : goto fail;
1333 : }
1334 : }
1335 :
1336 : peer->rec_index += 1;
1337 : peer->last_broadcast = time(NULL);
1338 :
1339 : TALLOC_FREE(log);
1340 : return;
1341 :
1342 : fail:
1343 : DBG_DEBUG("Dropping peer %s\n",
1344 : server_id_str_buf(peer->pid, &idbuf));
1345 : TALLOC_FREE(peer);
1346 : }
1347 :
1348 : /*
1349 : * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1350 : * messages) broadcasts by other notifyds. Several cases:
1351 : *
1352 : * We don't know the source. This creates a new peer. Creating a peer
1353 : * involves asking the peer for its full database. We assume ordered
1354 : * messages, so the new database will arrive before the next broadcast
1355 : * will.
1356 : *
1357 : * We know the source and the log index matches. We will apply the log
1358 : * locally to our peer's db as if we had received it from a local
1359 : * client.
1360 : *
1361 : * We know the source but the log index does not match. This means we
1362 : * lost a message. We just drop the whole peer and wait for the next
1363 : * broadcast, which will then trigger a fresh database pull.
1364 : */
1365 :
1366 : static int notifyd_snoop_broadcast(struct tevent_context *ev,
1367 : uint32_t src_vnn, uint32_t dst_vnn,
1368 : uint64_t dst_srvid,
1369 : const uint8_t *msg, size_t msglen,
1370 : void *private_data)
1371 : {
1372 : struct notifyd_state *state = talloc_get_type_abort(
1373 : private_data, struct notifyd_state);
1374 : struct server_id my_id = messaging_server_id(state->msg_ctx);
1375 : struct notifyd_peer *p;
1376 : uint32_t i;
1377 : uint32_t msg_type;
1378 : struct server_id src, dst;
1379 : struct server_id_buf idbuf;
1380 : NTSTATUS status;
1381 :
1382 : if (msglen < MESSAGE_HDR_LENGTH) {
1383 : DBG_DEBUG("Got short broadcast\n");
1384 : return 0;
1385 : }
1386 : message_hdr_get(&msg_type, &src, &dst, msg);
1387 :
1388 : if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1389 : DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type);
1390 : return 0;
1391 : }
1392 : if (server_id_equal(&src, &my_id)) {
1393 : DBG_DEBUG("Ignoring my own broadcast\n");
1394 : return 0;
1395 : }
1396 :
1397 : DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1398 : server_id_str_buf(src, &idbuf));
1399 :
1400 : for (i=0; i<state->num_peers; i++) {
1401 : if (server_id_equal(&state->peers[i]->pid, &src)) {
1402 :
1403 : DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i);
1404 :
1405 : notifyd_apply_reclog(state->peers[i],
1406 : msg + MESSAGE_HDR_LENGTH,
1407 : msglen - MESSAGE_HDR_LENGTH);
1408 : return 0;
1409 : }
1410 : }
1411 :
1412 : DBG_DEBUG("Creating new peer for %s\n",
1413 : server_id_str_buf(src, &idbuf));
1414 :
1415 : p = notifyd_peer_new(state, src);
1416 : if (p == NULL) {
1417 : DBG_DEBUG("notifyd_peer_new failed\n");
1418 : return 0;
1419 : }
1420 :
1421 : status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1422 : NULL, 0);
1423 : if (!NT_STATUS_IS_OK(status)) {
1424 : DBG_DEBUG("messaging_send_buf failed: %s\n",
1425 : nt_errstr(status));
1426 : TALLOC_FREE(p);
1427 : return 0;
1428 : }
1429 :
1430 : return 0;
1431 : }
1432 : #endif
|