Line data Source code
1 : /* 2 : * Unix SMB/CIFS implementation. 3 : * threadpool implementation based on pthreads 4 : * Copyright (C) Volker Lendecke 2009,2011 5 : * 6 : * This program is free software; you can redistribute it and/or modify 7 : * it under the terms of the GNU General Public License as published by 8 : * the Free Software Foundation; either version 3 of the License, or 9 : * (at your option) any later version. 10 : * 11 : * This program is distributed in the hope that it will be useful, 12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 : * GNU General Public License for more details. 15 : * 16 : * You should have received a copy of the GNU General Public License 17 : * along with this program. If not, see <http://www.gnu.org/licenses/>. 18 : */ 19 : 20 : #include "replace.h" 21 : #include "system/filesys.h" 22 : #include "pthreadpool_pipe.h" 23 : #include "pthreadpool.h" 24 : 25 : struct pthreadpool_pipe { 26 : struct pthreadpool *pool; 27 : int num_jobs; 28 : pid_t pid; 29 : int pipe_fds[2]; 30 : }; 31 : 32 : static int pthreadpool_pipe_signal(int jobid, 33 : void (*job_fn)(void *private_data), 34 : void *job_private_data, 35 : void *private_data); 36 : 37 6 : int pthreadpool_pipe_init(unsigned max_threads, 38 : struct pthreadpool_pipe **presult) 39 : { 40 6 : struct pthreadpool_pipe *pool; 41 6 : int ret; 42 : 43 6 : pool = calloc(1, sizeof(struct pthreadpool_pipe)); 44 6 : if (pool == NULL) { 45 0 : return ENOMEM; 46 : } 47 6 : pool->pid = getpid(); 48 : 49 6 : ret = pipe(pool->pipe_fds); 50 6 : if (ret == -1) { 51 0 : int err = errno; 52 0 : free(pool); 53 0 : return err; 54 : } 55 : 56 6 : ret = pthreadpool_init(max_threads, &pool->pool, 57 : pthreadpool_pipe_signal, pool); 58 6 : if (ret != 0) { 59 0 : close(pool->pipe_fds[0]); 60 0 : close(pool->pipe_fds[1]); 61 0 : free(pool); 62 0 : return ret; 63 : } 64 : 65 6 : *presult = pool; 66 6 : return 0; 67 : } 68 : 69 9884 : static int pthreadpool_pipe_signal(int jobid, 70 : void (*job_fn)(void *private_data), 71 : void *job_private_data, 72 : void *private_data) 73 : { 74 9884 : struct pthreadpool_pipe *pool = private_data; 75 9884 : ssize_t written; 76 : 77 9884 : do { 78 9884 : written = write(pool->pipe_fds[1], &jobid, sizeof(jobid)); 79 10002 : } while ((written == -1) && (errno == EINTR)); 80 : 81 10002 : if (written != sizeof(jobid)) { 82 0 : return errno; 83 : } 84 : 85 0 : return 0; 86 : } 87 : 88 6 : int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool) 89 : { 90 6 : int ret; 91 : 92 6 : if (pool->num_jobs != 0) { 93 0 : return EBUSY; 94 : } 95 : 96 5 : ret = pthreadpool_destroy(pool->pool); 97 5 : if (ret != 0) { 98 0 : return ret; 99 : } 100 : 101 5 : close(pool->pipe_fds[0]); 102 5 : pool->pipe_fds[0] = -1; 103 : 104 5 : close(pool->pipe_fds[1]); 105 5 : pool->pipe_fds[1] = -1; 106 : 107 5 : free(pool); 108 5 : return 0; 109 : } 110 : 111 10004 : static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool) 112 : { 113 10004 : pid_t pid = getpid(); 114 10004 : int signal_fd; 115 10004 : int ret; 116 : 117 10004 : if (pid == pool->pid) { 118 0 : return 0; 119 : } 120 : 121 0 : signal_fd = pool->pipe_fds[0]; 122 : 123 0 : close(pool->pipe_fds[0]); 124 0 : pool->pipe_fds[0] = -1; 125 : 126 0 : close(pool->pipe_fds[1]); 127 0 : pool->pipe_fds[1] = -1; 128 : 129 0 : ret = pipe(pool->pipe_fds); 130 0 : if (ret != 0) { 131 0 : return errno; 132 : } 133 : 134 0 : ret = dup2(pool->pipe_fds[0], signal_fd); 135 0 : if (ret != 0) { 136 0 : return errno; 137 : } 138 : 139 0 : pool->pipe_fds[0] = signal_fd; 140 0 : pool->num_jobs = 0; 141 : 142 0 : return 0; 143 : } 144 : 145 10004 : int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id, 146 : void (*fn)(void *private_data), 147 : void *private_data) 148 : { 149 10004 : int ret; 150 : 151 10004 : ret = pthreadpool_pipe_reinit(pool); 152 10004 : if (ret != 0) { 153 0 : return ret; 154 : } 155 : 156 10004 : ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data); 157 10004 : if (ret != 0) { 158 0 : return ret; 159 : } 160 : 161 10004 : pool->num_jobs += 1; 162 : 163 10004 : return 0; 164 : } 165 : 166 2 : int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool) 167 : { 168 2 : return pool->pipe_fds[0]; 169 : } 170 : 171 10003 : int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids, 172 : unsigned num_jobids) 173 : { 174 10003 : ssize_t to_read, nread, num_jobs; 175 10003 : pid_t pid = getpid(); 176 : 177 10003 : if (pool->pid != pid) { 178 0 : return EINVAL; 179 : } 180 : 181 10003 : to_read = sizeof(int) * num_jobids; 182 : 183 10003 : do { 184 10003 : nread = read(pool->pipe_fds[0], jobids, to_read); 185 10003 : } while ((nread == -1) && (errno == EINTR)); 186 : 187 10003 : if (nread == -1) { 188 0 : return -errno; 189 : } 190 10003 : if ((nread % sizeof(int)) != 0) { 191 0 : return -EINVAL; 192 : } 193 : 194 10003 : num_jobs = nread / sizeof(int); 195 : 196 10003 : if (num_jobs > pool->num_jobs) { 197 0 : return -EINVAL; 198 : } 199 10003 : pool->num_jobs -= num_jobs; 200 : 201 10003 : return num_jobs; 202 : }