Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Infrastructure for async requests
4 : Copyright (C) Volker Lendecke 2008
5 : Copyright (C) Stefan Metzmacher 2009
6 :
7 : ** NOTE! The following LGPL license applies to the tevent
8 : ** library. This does NOT imply that all of Samba is released
9 : ** under the LGPL
10 :
11 : This library is free software; you can redistribute it and/or
12 : modify it under the terms of the GNU Lesser General Public
13 : License as published by the Free Software Foundation; either
14 : version 3 of the License, or (at your option) any later version.
15 :
16 : This library is distributed in the hope that it will be useful,
17 : but WITHOUT ANY WARRANTY; without even the implied warranty of
18 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 : Lesser General Public License for more details.
20 :
21 : You should have received a copy of the GNU Lesser General Public
22 : License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 : */
24 :
25 : #include "replace.h"
26 : #include "tevent.h"
27 : #include "tevent_internal.h"
28 : #include "tevent_util.h"
29 :
30 : struct tevent_queue_entry {
31 : struct tevent_queue_entry *prev, *next;
32 : struct tevent_queue *queue;
33 :
34 : bool triggered;
35 :
36 : struct tevent_req *req;
37 : struct tevent_context *ev;
38 :
39 : tevent_queue_trigger_fn_t trigger;
40 : void *private_data;
41 : uint64_t tag;
42 : };
43 :
44 : struct tevent_queue {
45 : const char *name;
46 : const char *location;
47 :
48 : bool running;
49 : struct tevent_immediate *immediate;
50 :
51 : size_t length;
52 : struct tevent_queue_entry *list;
53 : };
54 :
55 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
56 : struct tevent_immediate *im,
57 : void *private_data);
58 :
59 6711598 : static int tevent_queue_entry_destructor(struct tevent_queue_entry *e)
60 : {
61 6711598 : struct tevent_queue *q = e->queue;
62 :
63 6711598 : if (!q) {
64 0 : return 0;
65 : }
66 :
67 6711598 : tevent_trace_queue_callback(q->list->ev, e, TEVENT_EVENT_TRACE_DETACH);
68 6711598 : DLIST_REMOVE(q->list, e);
69 6711598 : q->length--;
70 :
71 6711598 : if (!q->running) {
72 2542 : return 0;
73 : }
74 :
75 6709056 : if (!q->list) {
76 5528822 : return 0;
77 : }
78 :
79 1180234 : if (q->list->triggered) {
80 203 : return 0;
81 : }
82 :
83 1180031 : tevent_schedule_immediate(q->immediate,
84 : q->list->ev,
85 : tevent_queue_immediate_trigger,
86 : q);
87 :
88 1180031 : return 0;
89 : }
90 :
91 415868 : static int tevent_queue_destructor(struct tevent_queue *q)
92 : {
93 415868 : q->running = false;
94 :
95 415870 : while (q->list) {
96 2 : struct tevent_queue_entry *e = q->list;
97 2 : talloc_free(e);
98 : }
99 :
100 415868 : return 0;
101 : }
102 :
103 416675 : struct tevent_queue *_tevent_queue_create(TALLOC_CTX *mem_ctx,
104 : const char *name,
105 : const char *location)
106 : {
107 : struct tevent_queue *queue;
108 :
109 416675 : queue = talloc_zero(mem_ctx, struct tevent_queue);
110 416675 : if (!queue) {
111 0 : return NULL;
112 : }
113 :
114 416675 : queue->name = talloc_strdup(queue, name);
115 416675 : if (!queue->name) {
116 0 : talloc_free(queue);
117 0 : return NULL;
118 : }
119 416675 : queue->immediate = tevent_create_immediate(queue);
120 416675 : if (!queue->immediate) {
121 0 : talloc_free(queue);
122 0 : return NULL;
123 : }
124 :
125 416675 : queue->location = location;
126 :
127 : /* queue is running by default */
128 416675 : queue->running = true;
129 :
130 416675 : talloc_set_destructor(queue, tevent_queue_destructor);
131 416675 : return queue;
132 : }
133 :
134 1837478 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
135 : struct tevent_immediate *im,
136 : void *private_data)
137 : {
138 : struct tevent_queue *q =
139 1837478 : talloc_get_type_abort(private_data,
140 : struct tevent_queue);
141 :
142 1837478 : if (!q->running) {
143 0 : return;
144 : }
145 :
146 1837478 : if (!q->list) {
147 0 : return;
148 : }
149 :
150 1837478 : tevent_trace_queue_callback(ev, q->list,
151 : TEVENT_EVENT_TRACE_BEFORE_HANDLER);
152 : /* Set the call depth of the request coming from the queue. */
153 1837478 : tevent_thread_call_depth_set(q->list->req->internal.call_depth);
154 1837478 : q->list->triggered = true;
155 1837478 : q->list->trigger(q->list->req, q->list->private_data);
156 : }
157 :
158 1 : static void tevent_queue_noop_trigger(struct tevent_req *req,
159 : void *_private_data)
160 : {
161 : /* this is doing nothing but blocking the queue */
162 1 : }
163 :
164 6711598 : static struct tevent_queue_entry *tevent_queue_add_internal(
165 : struct tevent_queue *queue,
166 : struct tevent_context *ev,
167 : struct tevent_req *req,
168 : tevent_queue_trigger_fn_t trigger,
169 : void *private_data,
170 : bool allow_direct)
171 : {
172 : struct tevent_queue_entry *e;
173 :
174 6711598 : e = talloc_zero(req, struct tevent_queue_entry);
175 6711598 : if (e == NULL) {
176 0 : return NULL;
177 : }
178 :
179 : /*
180 : * if there is no trigger, it is just a blocker
181 : */
182 6711598 : if (trigger == NULL) {
183 1 : trigger = tevent_queue_noop_trigger;
184 : }
185 :
186 6711598 : e->queue = queue;
187 6711598 : e->req = req;
188 6711598 : e->ev = ev;
189 6711598 : e->trigger = trigger;
190 6711598 : e->private_data = private_data;
191 :
192 6711598 : if (queue->length > 0) {
193 : /*
194 : * if there are already entries in the
195 : * queue do not optimize.
196 : */
197 1180241 : allow_direct = false;
198 : }
199 :
200 6711598 : if (req->async.fn != NULL) {
201 : /*
202 : * If the caller wants to optimize for the
203 : * empty queue case, call the trigger only
204 : * if there is no callback defined for the
205 : * request yet.
206 : */
207 0 : allow_direct = false;
208 : }
209 :
210 6711598 : DLIST_ADD_END(queue->list, e);
211 6711598 : queue->length++;
212 6711598 : talloc_set_destructor(e, tevent_queue_entry_destructor);
213 6711598 : tevent_trace_queue_callback(ev, e, TEVENT_EVENT_TRACE_ATTACH);
214 :
215 6711598 : if (!queue->running) {
216 12 : return e;
217 : }
218 :
219 6711586 : if (queue->list->triggered) {
220 1061092 : return e;
221 : }
222 :
223 : /*
224 : * If allowed we directly call the trigger
225 : * avoiding possible delays caused by
226 : * an immediate event.
227 : */
228 5650494 : if (allow_direct) {
229 4873904 : tevent_trace_queue_callback(ev,
230 : queue->list,
231 : TEVENT_EVENT_TRACE_BEFORE_HANDLER);
232 4873904 : queue->list->triggered = true;
233 4873904 : queue->list->trigger(queue->list->req,
234 4873904 : queue->list->private_data);
235 4873904 : return e;
236 : }
237 :
238 776590 : tevent_schedule_immediate(queue->immediate,
239 : queue->list->ev,
240 : tevent_queue_immediate_trigger,
241 : queue);
242 :
243 776590 : return e;
244 : }
245 :
246 657180 : bool tevent_queue_add(struct tevent_queue *queue,
247 : struct tevent_context *ev,
248 : struct tevent_req *req,
249 : tevent_queue_trigger_fn_t trigger,
250 : void *private_data)
251 : {
252 : struct tevent_queue_entry *e;
253 :
254 657180 : e = tevent_queue_add_internal(queue, ev, req,
255 : trigger, private_data, false);
256 657180 : if (e == NULL) {
257 0 : return false;
258 : }
259 :
260 657180 : return true;
261 : }
262 :
263 4004 : struct tevent_queue_entry *tevent_queue_add_entry(
264 : struct tevent_queue *queue,
265 : struct tevent_context *ev,
266 : struct tevent_req *req,
267 : tevent_queue_trigger_fn_t trigger,
268 : void *private_data)
269 : {
270 4004 : return tevent_queue_add_internal(queue, ev, req,
271 : trigger, private_data, false);
272 : }
273 :
274 6050414 : struct tevent_queue_entry *tevent_queue_add_optimize_empty(
275 : struct tevent_queue *queue,
276 : struct tevent_context *ev,
277 : struct tevent_req *req,
278 : tevent_queue_trigger_fn_t trigger,
279 : void *private_data)
280 : {
281 6050414 : return tevent_queue_add_internal(queue, ev, req,
282 : trigger, private_data, true);
283 : }
284 :
285 3 : void tevent_queue_entry_untrigger(struct tevent_queue_entry *entry)
286 : {
287 3 : if (entry->queue->running) {
288 0 : abort();
289 : }
290 :
291 3 : if (entry->queue->list != entry) {
292 0 : abort();
293 : }
294 :
295 3 : entry->triggered = false;
296 3 : }
297 :
298 9299 : void tevent_queue_start(struct tevent_queue *queue)
299 : {
300 9299 : if (queue->running) {
301 : /* already started */
302 9296 : return;
303 : }
304 :
305 3 : queue->running = true;
306 :
307 3 : if (!queue->list) {
308 0 : return;
309 : }
310 :
311 3 : if (queue->list->triggered) {
312 0 : return;
313 : }
314 :
315 3 : tevent_schedule_immediate(queue->immediate,
316 : queue->list->ev,
317 : tevent_queue_immediate_trigger,
318 : queue);
319 : }
320 :
321 62504 : void tevent_queue_stop(struct tevent_queue *queue)
322 : {
323 62504 : queue->running = false;
324 62504 : }
325 :
326 3004629 : size_t tevent_queue_length(struct tevent_queue *queue)
327 : {
328 3004629 : return queue->length;
329 : }
330 :
331 0 : bool tevent_queue_running(struct tevent_queue *queue)
332 : {
333 0 : return queue->running;
334 : }
335 :
336 : struct tevent_queue_wait_state {
337 : uint8_t dummy;
338 : };
339 :
340 : static void tevent_queue_wait_trigger(struct tevent_req *req,
341 : void *private_data);
342 :
343 130429 : struct tevent_req *tevent_queue_wait_send(TALLOC_CTX *mem_ctx,
344 : struct tevent_context *ev,
345 : struct tevent_queue *queue)
346 : {
347 : struct tevent_req *req;
348 : struct tevent_queue_wait_state *state;
349 : bool ok;
350 :
351 130429 : req = tevent_req_create(mem_ctx, &state,
352 : struct tevent_queue_wait_state);
353 130429 : if (req == NULL) {
354 0 : return NULL;
355 : }
356 :
357 130429 : ok = tevent_queue_add(queue, ev, req,
358 : tevent_queue_wait_trigger,
359 : NULL);
360 130429 : if (!ok) {
361 0 : tevent_req_oom(req);
362 0 : return tevent_req_post(req, ev);
363 : }
364 :
365 130429 : return req;
366 : }
367 :
368 130421 : static void tevent_queue_wait_trigger(struct tevent_req *req,
369 : void *private_data)
370 : {
371 130421 : tevent_req_done(req);
372 130421 : }
373 :
374 130379 : bool tevent_queue_wait_recv(struct tevent_req *req)
375 : {
376 : enum tevent_req_state state;
377 : uint64_t err;
378 :
379 130379 : if (tevent_req_is_error(req, &state, &err)) {
380 0 : tevent_req_received(req);
381 0 : return false;
382 : }
383 :
384 130379 : tevent_req_received(req);
385 130379 : return true;
386 : }
387 :
388 10 : void tevent_queue_entry_set_tag(struct tevent_queue_entry *qe, uint64_t tag)
389 : {
390 10 : if (qe == NULL) {
391 0 : return;
392 : }
393 :
394 10 : qe->tag = tag;
395 : }
396 :
397 11 : uint64_t tevent_queue_entry_get_tag(const struct tevent_queue_entry *qe)
398 : {
399 11 : if (qe == NULL) {
400 1 : return 0;
401 : }
402 :
403 10 : return qe->tag;
404 : }
|