summaryrefslogtreecommitdiff
path: root/parallel-checkout.c
diff options
context:
space:
mode:
Diffstat (limited to 'parallel-checkout.c')
-rw-r--r--parallel-checkout.c300
1 files changed, 274 insertions, 26 deletions
diff --git a/parallel-checkout.c b/parallel-checkout.c
index 590e2a3046..836154fec6 100644
--- a/parallel-checkout.c
+++ b/parallel-checkout.c
@@ -1,28 +1,14 @@
#include "cache.h"
#include "entry.h"
#include "parallel-checkout.h"
+#include "pkt-line.h"
+#include "run-command.h"
+#include "sigchain.h"
#include "streaming.h"
-enum pc_item_status {
- PC_ITEM_PENDING = 0,
- PC_ITEM_WRITTEN,
- /*
- * The entry could not be written because there was another file
- * already present in its path or leading directories. Since
- * checkout_entry_ca() removes such files from the working tree before
- * enqueueing the entry for parallel checkout, it means that there was
- * a path collision among the entries being written.
- */
- PC_ITEM_COLLIDED,
- PC_ITEM_FAILED,
-};
-
-struct parallel_checkout_item {
- /* pointer to a istate->cache[] entry. Not owned by us. */
- struct cache_entry *ce;
- struct conv_attrs ca;
- struct stat st;
- enum pc_item_status status;
+struct pc_worker {
+ struct child_process cp;
+ size_t next_item_to_complete, nr_items_to_complete;
};
struct parallel_checkout {
@@ -59,6 +45,7 @@ static int is_eligible_for_parallel_checkout(const struct cache_entry *ce,
const struct conv_attrs *ca)
{
enum conv_attrs_classification c;
+ size_t packed_item_size;
/*
* Symlinks cannot be checked out in parallel as, in case of path
@@ -69,6 +56,18 @@ static int is_eligible_for_parallel_checkout(const struct cache_entry *ce,
if (!S_ISREG(ce->ce_mode))
return 0;
+ packed_item_size = sizeof(struct pc_item_fixed_portion) + ce->ce_namelen +
+ (ca->working_tree_encoding ? strlen(ca->working_tree_encoding) : 0);
+
+ /*
+ * The amount of data we send to the workers per checkout item is
+ * typically small (75~300B). So unless we find an insanely huge path
+ * of 64KB, we should never reach the 65KB limit of one pkt-line. If
+ * that does happen, we let the sequential code handle the item.
+ */
+ if (packed_item_size > LARGE_PACKET_DATA_MAX)
+ return 0;
+
c = classify_conv_attrs(ca);
switch (c) {
case CA_CLASS_INCORE:
@@ -121,10 +120,12 @@ int enqueue_checkout(struct cache_entry *ce, struct conv_attrs *ca)
ALLOC_GROW(parallel_checkout.items, parallel_checkout.nr + 1,
parallel_checkout.alloc);
- pc_item = &parallel_checkout.items[parallel_checkout.nr++];
+ pc_item = &parallel_checkout.items[parallel_checkout.nr];
pc_item->ce = ce;
memcpy(&pc_item->ca, ca, sizeof(pc_item->ca));
pc_item->status = PC_ITEM_PENDING;
+ pc_item->id = parallel_checkout.nr;
+ parallel_checkout.nr++;
return 0;
}
@@ -236,7 +237,8 @@ static int write_pc_item_to_fd(struct parallel_checkout_item *pc_item, int fd,
/*
* checkout metadata is used to give context for external process
* filters. Files requiring such filters are not eligible for parallel
- * checkout, so pass NULL.
+ * checkout, so pass NULL. Note: if that changes, the metadata must also
+ * be passed from the main process to the workers.
*/
ret = convert_to_working_tree_ca(&pc_item->ca, pc_item->ce->name,
blob, size, &buf, NULL);
@@ -268,8 +270,8 @@ static int close_and_clear(int *fd)
return ret;
}
-static void write_pc_item(struct parallel_checkout_item *pc_item,
- struct checkout *state)
+void write_pc_item(struct parallel_checkout_item *pc_item,
+ struct checkout *state)
{
unsigned int mode = (pc_item->ce->ce_mode & 0100) ? 0777 : 0666;
int fd = -1, fstat_done = 0;
@@ -340,6 +342,240 @@ out:
strbuf_release(&path);
}
+static void send_one_item(int fd, struct parallel_checkout_item *pc_item)
+{
+ size_t len_data;
+ char *data, *variant;
+ struct pc_item_fixed_portion *fixed_portion;
+ const char *working_tree_encoding = pc_item->ca.working_tree_encoding;
+ size_t name_len = pc_item->ce->ce_namelen;
+ size_t working_tree_encoding_len = working_tree_encoding ?
+ strlen(working_tree_encoding) : 0;
+
+ /*
+ * Any changes in the calculation of the message size must also be made
+ * in is_eligible_for_parallel_checkout().
+ */
+ len_data = sizeof(struct pc_item_fixed_portion) + name_len +
+ working_tree_encoding_len;
+
+ data = xcalloc(1, len_data);
+
+ fixed_portion = (struct pc_item_fixed_portion *)data;
+ fixed_portion->id = pc_item->id;
+ fixed_portion->ce_mode = pc_item->ce->ce_mode;
+ fixed_portion->crlf_action = pc_item->ca.crlf_action;
+ fixed_portion->ident = pc_item->ca.ident;
+ fixed_portion->name_len = name_len;
+ fixed_portion->working_tree_encoding_len = working_tree_encoding_len;
+ /*
+ * We use hashcpy() instead of oidcpy() because the hash[] positions
+ * after `the_hash_algo->rawsz` might not be initialized. And Valgrind
+ * would complain about passing uninitialized bytes to a syscall
+ * (write(2)). There is no real harm in this case, but the warning could
+ * hinder the detection of actual errors.
+ */
+ hashcpy(fixed_portion->oid.hash, pc_item->ce->oid.hash);
+
+ variant = data + sizeof(*fixed_portion);
+ if (working_tree_encoding_len) {
+ memcpy(variant, working_tree_encoding, working_tree_encoding_len);
+ variant += working_tree_encoding_len;
+ }
+ memcpy(variant, pc_item->ce->name, name_len);
+
+ packet_write(fd, data, len_data);
+
+ free(data);
+}
+
+static void send_batch(int fd, size_t start, size_t nr)
+{
+ size_t i;
+ sigchain_push(SIGPIPE, SIG_IGN);
+ for (i = 0; i < nr; i++)
+ send_one_item(fd, &parallel_checkout.items[start + i]);
+ packet_flush(fd);
+ sigchain_pop(SIGPIPE);
+}
+
+static struct pc_worker *setup_workers(struct checkout *state, int num_workers)
+{
+ struct pc_worker *workers;
+ int i, workers_with_one_extra_item;
+ size_t base_batch_size, batch_beginning = 0;
+
+ ALLOC_ARRAY(workers, num_workers);
+
+ for (i = 0; i < num_workers; i++) {
+ struct child_process *cp = &workers[i].cp;
+
+ child_process_init(cp);
+ cp->git_cmd = 1;
+ cp->in = -1;
+ cp->out = -1;
+ cp->clean_on_exit = 1;
+ strvec_push(&cp->args, "checkout--worker");
+ if (state->base_dir_len)
+ strvec_pushf(&cp->args, "--prefix=%s", state->base_dir);
+ if (start_command(cp))
+ die("failed to spawn checkout worker");
+ }
+
+ base_batch_size = parallel_checkout.nr / num_workers;
+ workers_with_one_extra_item = parallel_checkout.nr % num_workers;
+
+ for (i = 0; i < num_workers; i++) {
+ struct pc_worker *worker = &workers[i];
+ size_t batch_size = base_batch_size;
+
+ /* distribute the extra work evenly */
+ if (i < workers_with_one_extra_item)
+ batch_size++;
+
+ send_batch(worker->cp.in, batch_beginning, batch_size);
+ worker->next_item_to_complete = batch_beginning;
+ worker->nr_items_to_complete = batch_size;
+
+ batch_beginning += batch_size;
+ }
+
+ return workers;
+}
+
+static void finish_workers(struct pc_worker *workers, int num_workers)
+{
+ int i;
+
+ /*
+ * Close pipes before calling finish_command() to let the workers
+ * exit asynchronously and avoid spending extra time on wait().
+ */
+ for (i = 0; i < num_workers; i++) {
+ struct child_process *cp = &workers[i].cp;
+ if (cp->in >= 0)
+ close(cp->in);
+ if (cp->out >= 0)
+ close(cp->out);
+ }
+
+ for (i = 0; i < num_workers; i++) {
+ int rc = finish_command(&workers[i].cp);
+ if (rc > 128) {
+ /*
+ * For a normal non-zero exit, the worker should have
+ * already printed something useful to stderr. But a
+ * death by signal should be mentioned to the user.
+ */
+ error("checkout worker %d died of signal %d", i, rc - 128);
+ }
+ }
+
+ free(workers);
+}
+
+static inline void assert_pc_item_result_size(int got, int exp)
+{
+ if (got != exp)
+ BUG("wrong result size from checkout worker (got %dB, exp %dB)",
+ got, exp);
+}
+
+static void parse_and_save_result(const char *buffer, int len,
+ struct pc_worker *worker)
+{
+ struct pc_item_result *res;
+ struct parallel_checkout_item *pc_item;
+ struct stat *st = NULL;
+
+ if (len < PC_ITEM_RESULT_BASE_SIZE)
+ BUG("too short result from checkout worker (got %dB, exp >=%dB)",
+ len, (int)PC_ITEM_RESULT_BASE_SIZE);
+
+ res = (struct pc_item_result *)buffer;
+
+ /*
+ * Worker should send either the full result struct on success, or
+ * just the base (i.e. no stat data), otherwise.
+ */
+ if (res->status == PC_ITEM_WRITTEN) {
+ assert_pc_item_result_size(len, (int)sizeof(struct pc_item_result));
+ st = &res->st;
+ } else {
+ assert_pc_item_result_size(len, (int)PC_ITEM_RESULT_BASE_SIZE);
+ }
+
+ if (!worker->nr_items_to_complete)
+ BUG("received result from supposedly finished checkout worker");
+ if (res->id != worker->next_item_to_complete)
+ BUG("unexpected item id from checkout worker (got %"PRIuMAX", exp %"PRIuMAX")",
+ (uintmax_t)res->id, (uintmax_t)worker->next_item_to_complete);
+
+ worker->next_item_to_complete++;
+ worker->nr_items_to_complete--;
+
+ pc_item = &parallel_checkout.items[res->id];
+ pc_item->status = res->status;
+ if (st)
+ pc_item->st = *st;
+}
+
+static void gather_results_from_workers(struct pc_worker *workers,
+ int num_workers)
+{
+ int i, active_workers = num_workers;
+ struct pollfd *pfds;
+
+ CALLOC_ARRAY(pfds, num_workers);
+ for (i = 0; i < num_workers; i++) {
+ pfds[i].fd = workers[i].cp.out;
+ pfds[i].events = POLLIN;
+ }
+
+ while (active_workers) {
+ int nr = poll(pfds, num_workers, -1);
+
+ if (nr < 0) {
+ if (errno == EINTR)
+ continue;
+ die_errno("failed to poll checkout workers");
+ }
+
+ for (i = 0; i < num_workers && nr > 0; i++) {
+ struct pc_worker *worker = &workers[i];
+ struct pollfd *pfd = &pfds[i];
+
+ if (!pfd->revents)
+ continue;
+
+ if (pfd->revents & POLLIN) {
+ int len = packet_read(pfd->fd, NULL, NULL,
+ packet_buffer,
+ sizeof(packet_buffer), 0);
+
+ if (len < 0) {
+ BUG("packet_read() returned negative value");
+ } else if (!len) {
+ pfd->fd = -1;
+ active_workers--;
+ } else {
+ parse_and_save_result(packet_buffer,
+ len, worker);
+ }
+ } else if (pfd->revents & POLLHUP) {
+ pfd->fd = -1;
+ active_workers--;
+ } else if (pfd->revents & (POLLNVAL | POLLERR)) {
+ die("error polling from checkout worker");
+ }
+
+ nr--;
+ }
+ }
+
+ free(pfds);
+}
+
static void write_items_sequentially(struct checkout *state)
{
size_t i;
@@ -348,16 +584,28 @@ static void write_items_sequentially(struct checkout *state)
write_pc_item(&parallel_checkout.items[i], state);
}
+static const int DEFAULT_NUM_WORKERS = 2;
+
int run_parallel_checkout(struct checkout *state)
{
- int ret;
+ int ret, num_workers = DEFAULT_NUM_WORKERS;
if (parallel_checkout.status != PC_ACCEPTING_ENTRIES)
BUG("cannot run parallel checkout: uninitialized or already running");
parallel_checkout.status = PC_RUNNING;
- write_items_sequentially(state);
+ if (parallel_checkout.nr < num_workers)
+ num_workers = parallel_checkout.nr;
+
+ if (num_workers <= 1) {
+ write_items_sequentially(state);
+ } else {
+ struct pc_worker *workers = setup_workers(state, num_workers);
+ gather_results_from_workers(workers, num_workers);
+ finish_workers(workers, num_workers);
+ }
+
ret = handle_results(state);
finish_parallel_checkout();