Line data Source code
1 : /*
2 : Unix SMB/CIFS Implementation.
3 : DSDB replication service outgoing Pull-Replication
4 :
5 : Copyright (C) Stefan Metzmacher 2007
6 :
7 : This program is free software; you can redistribute it and/or modify
8 : it under the terms of the GNU General Public License as published by
9 : the Free Software Foundation; either version 3 of the License, or
10 : (at your option) any later version.
11 :
12 : This program is distributed in the hope that it will be useful,
13 : but WITHOUT ANY WARRANTY; without even the implied warranty of
14 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 : GNU General Public License for more details.
16 :
17 : You should have received a copy of the GNU General Public License
18 : along with this program. If not, see <http://www.gnu.org/licenses/>.
19 :
20 : */
21 :
22 : #include "includes.h"
23 : #include "dsdb/samdb/samdb.h"
24 : #include "auth/auth.h"
25 : #include "samba/service.h"
26 : #include "lib/events/events.h"
27 : #include "dsdb/repl/drepl_service.h"
28 : #include <ldb_errors.h>
29 : #include "../lib/util/dlinklist.h"
30 : #include "librpc/gen_ndr/ndr_misc.h"
31 : #include "librpc/gen_ndr/ndr_drsuapi.h"
32 : #include "librpc/gen_ndr/ndr_drsblobs.h"
33 : #include "libcli/composite/composite.h"
34 : #include "libcli/security/security.h"
35 :
36 : #undef DBGC_CLASS
37 : #define DBGC_CLASS DBGC_DRS_REPL
38 :
39 : /*
40 : update repsFrom/repsTo error information
41 : */
42 8580 : void drepl_reps_update(struct dreplsrv_service *s, const char *reps_attr,
43 : struct ldb_dn *dn,
44 : struct GUID *source_dsa_obj_guid, WERROR status)
45 : {
46 0 : struct repsFromToBlob *reps;
47 0 : uint32_t count, i;
48 0 : WERROR werr;
49 8580 : TALLOC_CTX *tmp_ctx = talloc_new(s);
50 0 : time_t t;
51 0 : NTTIME now;
52 :
53 8580 : t = time(NULL);
54 8580 : unix_to_nt_time(&now, t);
55 :
56 8580 : werr = dsdb_loadreps(s->samdb, tmp_ctx, dn, reps_attr, &reps, &count);
57 8580 : if (!W_ERROR_IS_OK(werr)) {
58 0 : talloc_free(tmp_ctx);
59 22 : return;
60 : }
61 :
62 13569 : for (i=0; i<count; i++) {
63 13547 : if (GUID_equal(source_dsa_obj_guid,
64 13547 : &reps[i].ctr.ctr1.source_dsa_obj_guid)) {
65 8558 : break;
66 : }
67 : }
68 :
69 8580 : if (i == count) {
70 : /* no record to update */
71 22 : talloc_free(tmp_ctx);
72 22 : return;
73 : }
74 :
75 : /* only update the status fields */
76 8558 : reps[i].ctr.ctr1.last_attempt = now;
77 8558 : reps[i].ctr.ctr1.result_last_attempt = status;
78 8558 : if (W_ERROR_IS_OK(status)) {
79 3127 : reps[i].ctr.ctr1.last_success = now;
80 3127 : reps[i].ctr.ctr1.consecutive_sync_failures = 0;
81 : } else {
82 5431 : reps[i].ctr.ctr1.consecutive_sync_failures++;
83 : }
84 :
85 8558 : werr = dsdb_savereps(s->samdb, tmp_ctx, dn, reps_attr, reps, count);
86 8558 : if (!W_ERROR_IS_OK(werr)) {
87 0 : DEBUG(2,("drepl_reps_update: Failed to save %s for %s: %s\n",
88 : reps_attr, ldb_dn_get_linearized(dn), win_errstr(werr)));
89 : }
90 8558 : talloc_free(tmp_ctx);
91 : }
92 :
93 4239 : WERROR dreplsrv_schedule_partition_pull_source(struct dreplsrv_service *s,
94 : struct dreplsrv_partition_source_dsa *source,
95 : uint32_t options,
96 : enum drsuapi_DsExtendedOperation extended_op,
97 : uint64_t fsmo_info,
98 : dreplsrv_extended_callback_t callback,
99 : void *cb_data)
100 : {
101 0 : struct dreplsrv_out_operation *op;
102 :
103 4239 : op = talloc_zero(s, struct dreplsrv_out_operation);
104 4239 : W_ERROR_HAVE_NO_MEMORY(op);
105 :
106 4239 : op->service = s;
107 : /*
108 : * source may either be the long-term list of partners, or
109 : * from dreplsrv_partition_source_dsa_temporary(). Because it
110 : * can be either, we can't talloc_steal() it here, so we
111 : * instead we reference it.
112 : *
113 : * We never talloc_free() the p->sources pointers - indeed we
114 : * never remove them - and the temp source will otherwise go
115 : * away with the msg it is allocated on.
116 : *
117 : * Finally the pointer created in drepl_request_extended_op()
118 : * is removed with talloc_unlink().
119 : *
120 : */
121 4239 : op->source_dsa = talloc_reference(op, source);
122 4239 : if (!op->source_dsa) {
123 0 : return WERR_NOT_ENOUGH_MEMORY;
124 : }
125 :
126 4239 : op->options = options;
127 4239 : op->extended_op = extended_op;
128 4239 : op->fsmo_info = fsmo_info;
129 4239 : op->callback = callback;
130 4239 : op->cb_data = cb_data;
131 4239 : op->schedule_time = time(NULL);
132 4239 : op->more_flags = 0;
133 :
134 4239 : DLIST_ADD_END(s->ops.pending, op);
135 :
136 4239 : return WERR_OK;
137 : }
138 :
139 1102 : static WERROR dreplsrv_schedule_partition_pull(struct dreplsrv_service *s,
140 : struct dreplsrv_partition *p,
141 : TALLOC_CTX *mem_ctx)
142 : {
143 10 : WERROR status;
144 10 : struct dreplsrv_partition_source_dsa *cur;
145 :
146 1269 : for (cur = p->sources; cur; cur = cur->next) {
147 167 : status = dreplsrv_schedule_partition_pull_source(s, cur,
148 : 0, DRSUAPI_EXOP_NONE, 0,
149 : NULL, NULL);
150 167 : W_ERROR_NOT_OK_RETURN(status);
151 : }
152 :
153 1102 : return WERR_OK;
154 : }
155 :
156 226 : WERROR dreplsrv_schedule_pull_replication(struct dreplsrv_service *s, TALLOC_CTX *mem_ctx)
157 : {
158 2 : WERROR status;
159 2 : struct dreplsrv_partition *p;
160 :
161 1328 : for (p = s->partitions; p; p = p->next) {
162 1102 : status = dreplsrv_schedule_partition_pull(s, p, mem_ctx);
163 1102 : W_ERROR_NOT_OK_RETURN(status);
164 : }
165 :
166 226 : return WERR_OK;
167 : }
168 :
169 :
170 3924 : static void dreplsrv_pending_op_callback(struct tevent_req *subreq)
171 : {
172 3924 : struct dreplsrv_out_operation *op = tevent_req_callback_data(subreq,
173 : struct dreplsrv_out_operation);
174 3924 : struct repsFromTo1 *rf = op->source_dsa->repsFrom1;
175 3924 : struct dreplsrv_service *s = op->service;
176 0 : WERROR werr;
177 :
178 3924 : werr = dreplsrv_op_pull_source_recv(subreq);
179 3924 : TALLOC_FREE(subreq);
180 :
181 3924 : DEBUG(4,("dreplsrv_op_pull_source(%s) for %s\n", win_errstr(werr),
182 : ldb_dn_get_linearized(op->source_dsa->partition->dn)));
183 :
184 3924 : if (op->extended_op == DRSUAPI_EXOP_NONE) {
185 2050 : drepl_reps_update(s, "repsFrom", op->source_dsa->partition->dn,
186 : &rf->source_dsa_obj_guid, werr);
187 : }
188 :
189 3924 : if (op->callback) {
190 2837 : op->callback(s, werr, op->extended_ret, op->cb_data);
191 : }
192 3924 : talloc_free(op);
193 3924 : s->ops.current = NULL;
194 3924 : dreplsrv_run_pending_ops(s);
195 3924 : }
196 :
197 4706 : void dreplsrv_run_pull_ops(struct dreplsrv_service *s)
198 : {
199 0 : struct dreplsrv_out_operation *op;
200 0 : time_t t;
201 0 : NTTIME now;
202 0 : struct tevent_req *subreq;
203 0 : WERROR werr;
204 :
205 4706 : if (s->ops.n_current || s->ops.current) {
206 : /* if there's still one running, we're done */
207 4391 : return;
208 : }
209 :
210 4239 : if (!s->ops.pending) {
211 : /* if there're no pending operations, we're done */
212 0 : return;
213 : }
214 :
215 4239 : t = time(NULL);
216 4239 : unix_to_nt_time(&now, t);
217 :
218 4239 : op = s->ops.pending;
219 4239 : s->ops.current = op;
220 4239 : DLIST_REMOVE(s->ops.pending, op);
221 :
222 4239 : op->source_dsa->repsFrom1->last_attempt = now;
223 :
224 : /* check if inbound replication is enabled */
225 4239 : if (!(op->options & DRSUAPI_DRS_SYNC_FORCED)) {
226 0 : uint32_t rep_options;
227 3322 : if (samdb_ntds_options(op->service->samdb, &rep_options) != LDB_SUCCESS) {
228 0 : werr = WERR_DS_DRA_INTERNAL_ERROR;
229 0 : goto failed;
230 : }
231 :
232 3322 : if ((rep_options & DS_NTDSDSA_OPT_DISABLE_INBOUND_REPL)) {
233 315 : werr = WERR_DS_DRA_SINK_DISABLED;
234 315 : goto failed;
235 : }
236 : }
237 :
238 3924 : subreq = dreplsrv_op_pull_source_send(op, s->task->event_ctx, op);
239 3924 : if (!subreq) {
240 0 : werr = WERR_NOT_ENOUGH_MEMORY;
241 0 : goto failed;
242 : }
243 :
244 3924 : tevent_req_set_callback(subreq, dreplsrv_pending_op_callback, op);
245 3924 : return;
246 :
247 315 : failed:
248 315 : if (op->extended_op == DRSUAPI_EXOP_NONE) {
249 315 : drepl_reps_update(s, "repsFrom", op->source_dsa->partition->dn,
250 315 : &op->source_dsa->repsFrom1->source_dsa_obj_guid, werr);
251 : }
252 : /* unblock queue processing */
253 315 : s->ops.current = NULL;
254 : /*
255 : * let the callback do its job just like in any other failure situation
256 : */
257 315 : if (op->callback) {
258 2 : op->callback(s, werr, op->extended_ret, op->cb_data);
259 : }
260 : }
|