From 28e21152728cbea617948671df064ec75c7953e5 Mon Sep 17 00:00:00 2001 From: Joseph Redmon Date: Sun, 7 Dec 2014 00:41:26 -0800 Subject: [PATCH] Distributed training --- Makefile | 2 +- src/cnn.c | 66 ++++++++-- src/connected_layer.c | 4 + src/convolutional_layer.c | 9 +- src/convolutional_layer.h | 4 - src/opencl.c | 2 +- src/server.c | 247 ++++++++++++++++++++++++-------------- src/server.h | 2 +- src/utils.c | 3 +- 9 files changed, 229 insertions(+), 110 deletions(-) diff --git a/Makefile b/Makefile index c2eecd5f..32479995 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ endif endif CFLAGS= $(COMMON) $(OPTS) #CFLAGS= $(COMMON) -O0 -g -LDFLAGS+=`pkg-config --libs opencv` -lm +LDFLAGS+=`pkg-config --libs opencv` -lm -pthread VPATH=./src/ EXEC=cnn OBJDIR=./obj/ diff --git a/src/cnn.c b/src/cnn.c index 46248eda..7971b957 100644 --- a/src/cnn.c +++ b/src/cnn.c @@ -8,6 +8,7 @@ #include "matrix.h" #include "utils.h" #include "mini_blas.h" +#include "server.h" #include #include @@ -370,15 +371,52 @@ void train_detection_net() } } +void train_imagenet_distributed(char *address) +{ + float avg_loss = 1; + srand(0); + network net = parse_network_cfg("cfg/alexnet.client"); + printf("Learning Rate: %g, Momentum: %g, Decay: %g\n", net.learning_rate, net.momentum, net.decay); + int imgs = 1000/net.batch+1; + imgs = 1; + int i = 0; + char **labels = get_labels("/home/pjreddie/data/imagenet/cls.labels.list"); + list *plist = get_paths("/data/imagenet/cls.train.list"); + char **paths = (char **)list_to_array(plist); + printf("%d\n", plist->size); + clock_t time; + while(1){ + i += 1; + time=clock(); + data train = load_data_random(imgs*net.batch, paths, plist->size, labels, 1000, 256, 256); + //translate_data_rows(train, -144); + normalize_data_rows(train); + printf("Loaded: %lf seconds\n", sec(clock()-time)); + time=clock(); +#ifdef GPU + float loss = train_network_data_gpu(net, train, imgs); + client_update(net, address); + avg_loss = avg_loss*.9 + loss*.1; + printf("%d: %f, %f avg, %lf seconds, %d images\n", i, loss, avg_loss, sec(clock()-time), i*imgs*net.batch); +#endif + free_data(train); + if(i%10==0){ + char buff[256]; + sprintf(buff, "/home/pjreddie/imagenet_backup/alexnet_%d.cfg", i); + save_network(net, buff); + } + } +} void train_imagenet() { float avg_loss = 1; //network net = parse_network_cfg("/home/pjreddie/imagenet_backup/alexnet_1270.cfg"); - network net = parse_network_cfg("cfg/alexnet.part"); + srand(0); + network net = parse_network_cfg("cfg/alexnet.cfg"); printf("Learning Rate: %g, Momentum: %g, Decay: %g\n", net.learning_rate, net.momentum, net.decay); int imgs = 1000/net.batch+1; - srand(time(0)); + imgs=1; int i = 0; char **labels = get_labels("/home/pjreddie/data/imagenet/cls.labels.list"); list *plist = get_paths("/data/imagenet/cls.train.list"); @@ -450,7 +488,7 @@ void draw_detection(image im, float *box) for(c = 0; c < 8; ++c){ j = (r*8 + c) * 5; printf("Prob: %f\n", box[j]); - if(box[j] > .05){ + if(box[j] > .01){ int d = 256/8; int y = r*d+box[j+1]*d; int x = c*d+box[j+2]*d; @@ -715,6 +753,7 @@ void test_split() printf("%d, %d, %d\n", train.X.rows, split[0].X.rows, split[1].X.rows); } +/* void test_im2row() { int h = 20; @@ -734,6 +773,7 @@ void test_im2row() //image render = float_to_image(mh, mw, mc, matrix); } } +*/ void flip_network() { @@ -830,15 +870,23 @@ void test_correct_alexnet() #endif } -void test_server() +void run_server() { - network net = parse_network_cfg("cfg/alexnet.test"); + srand(0); + network net = parse_network_cfg("cfg/alexnet.server"); server_update(net); } void test_client() { - network net = parse_network_cfg("cfg/alexnet.test"); - client_update(net); + network net = parse_network_cfg("cfg/alexnet.client"); + clock_t time=clock(); + client_update(net, "localhost"); + printf("1\n"); + client_update(net, "localhost"); + printf("2\n"); + client_update(net, "localhost"); + printf("3\n"); + printf("Transfered: %lf seconds\n", sec(clock()-time)); } int main(int argc, char *argv[]) @@ -853,8 +901,8 @@ int main(int argc, char *argv[]) else if(0==strcmp(argv[1], "nist")) train_nist(); else if(0==strcmp(argv[1], "test_correct")) test_correct_alexnet(); else if(0==strcmp(argv[1], "test")) test_imagenet(); - else if(0==strcmp(argv[1], "server")) test_server(); - else if(0==strcmp(argv[1], "client")) test_client(); + else if(0==strcmp(argv[1], "server")) run_server(); + else if(0==strcmp(argv[1], "client")) train_imagenet_distributed(argv[2]); else if(0==strcmp(argv[1], "detect")) test_detection(); else if(0==strcmp(argv[1], "visualize")) test_visualize(argv[2]); else if(0==strcmp(argv[1], "valid")) validate_imagenet(argv[2]); diff --git a/src/connected_layer.c b/src/connected_layer.c index 05d4a037..bcca631c 100644 --- a/src/connected_layer.c +++ b/src/connected_layer.c @@ -112,12 +112,16 @@ void pull_connected_layer(connected_layer layer) { cl_read_array(layer.weights_cl, layer.weights, layer.inputs*layer.outputs); cl_read_array(layer.biases_cl, layer.biases, layer.outputs); + cl_read_array(layer.weight_updates_cl, layer.weight_updates, layer.inputs*layer.outputs); + cl_read_array(layer.bias_updates_cl, layer.bias_updates, layer.outputs); } void push_connected_layer(connected_layer layer) { cl_write_array(layer.weights_cl, layer.weights, layer.inputs*layer.outputs); cl_write_array(layer.biases_cl, layer.biases, layer.outputs); + cl_write_array(layer.weight_updates_cl, layer.weight_updates, layer.inputs*layer.outputs); + cl_write_array(layer.bias_updates_cl, layer.bias_updates, layer.outputs); } void update_connected_layer_gpu(connected_layer layer) diff --git a/src/convolutional_layer.c b/src/convolutional_layer.c index bae06d33..4ca6104f 100644 --- a/src/convolutional_layer.c +++ b/src/convolutional_layer.c @@ -59,11 +59,9 @@ convolutional_layer *make_convolutional_layer(int batch, int h, int w, int c, in layer->filters = calloc(c*n*size*size, sizeof(float)); layer->filter_updates = calloc(c*n*size*size, sizeof(float)); - layer->filter_momentum = calloc(c*n*size*size, sizeof(float)); layer->biases = calloc(n, sizeof(float)); layer->bias_updates = calloc(n, sizeof(float)); - layer->bias_momentum = calloc(n, sizeof(float)); float scale = 1./(size*size*c); scale = .01; for(i = 0; i < c*n*size*size; ++i) layer->filters[i] = scale*2*(rand_uniform()-.5); @@ -77,14 +75,13 @@ convolutional_layer *make_convolutional_layer(int batch, int h, int w, int c, in layer->col_image = calloc(out_h*out_w*size*size*c, sizeof(float)); layer->output = calloc(layer->batch*out_h * out_w * n, sizeof(float)); layer->delta = calloc(layer->batch*out_h * out_w * n, sizeof(float)); + #ifdef GPU layer->filters_cl = cl_make_array(layer->filters, c*n*size*size); layer->filter_updates_cl = cl_make_array(layer->filter_updates, c*n*size*size); - layer->filter_momentum_cl = cl_make_array(layer->filter_momentum, c*n*size*size); layer->biases_cl = cl_make_array(layer->biases, n); layer->bias_updates_cl = cl_make_array(layer->bias_updates, n); - layer->bias_momentum_cl = cl_make_array(layer->bias_momentum, n); layer->col_image_cl = cl_make_array(layer->col_image, out_h*out_w*size*size*c); layer->delta_cl = cl_make_array(layer->delta, layer->batch*out_h*out_w*n); @@ -394,12 +391,16 @@ void pull_convolutional_layer(convolutional_layer layer) { cl_read_array(layer.filters_cl, layer.filters, layer.c*layer.n*layer.size*layer.size); cl_read_array(layer.biases_cl, layer.biases, layer.n); + cl_read_array(layer.filter_updates_cl, layer.filter_updates, layer.c*layer.n*layer.size*layer.size); + cl_read_array(layer.bias_updates_cl, layer.bias_updates, layer.n); } void push_convolutional_layer(convolutional_layer layer) { cl_write_array(layer.filters_cl, layer.filters, layer.c*layer.n*layer.size*layer.size); cl_write_array(layer.biases_cl, layer.biases, layer.n); + cl_write_array(layer.filter_updates_cl, layer.filter_updates, layer.c*layer.n*layer.size*layer.size); + cl_write_array(layer.bias_updates_cl, layer.bias_updates, layer.n); } void update_convolutional_layer_gpu(convolutional_layer layer) diff --git a/src/convolutional_layer.h b/src/convolutional_layer.h index 1ceca16c..28819bb5 100644 --- a/src/convolutional_layer.h +++ b/src/convolutional_layer.h @@ -18,11 +18,9 @@ typedef struct { int pad; float *filters; float *filter_updates; - float *filter_momentum; float *biases; float *bias_updates; - float *bias_momentum; float *col_image; float *delta; @@ -31,11 +29,9 @@ typedef struct { #ifdef GPU cl_mem filters_cl; cl_mem filter_updates_cl; - cl_mem filter_momentum_cl; cl_mem biases_cl; cl_mem bias_updates_cl; - cl_mem bias_momentum_cl; cl_mem col_image_cl; cl_mem delta_cl; diff --git a/src/opencl.c b/src/opencl.c index 981067a7..55fb56c7 100644 --- a/src/opencl.c +++ b/src/opencl.c @@ -88,7 +88,7 @@ cl_info cl_init() } int index = getpid()%num_devices; - index = 0; + index = 1; printf("%d rand, %d devices, %d index\n", getpid(), num_devices, index); info.device = devices[index]; fprintf(stderr, "Found %d device(s)\n", num_devices); diff --git a/src/server.c b/src/server.c index bcb59f5d..c802f84e 100644 --- a/src/server.c +++ b/src/server.c @@ -1,136 +1,205 @@ +#include /* needed for sockaddr_in */ +#include /* needed for sockaddr_in */ +#include #include #include #include /* needed for sockaddr_in */ -#include /* needed for sockaddr_in */ -#include /* needed for sockaddr_in */ #include +#include +#include "mini_blas.h" +#include "utils.h" #include "server.h" #include "connected_layer.h" +#include "convolutional_layer.h" -#define MESSAGESIZE 50012 -#define NUMFLOATS ((MESSAGESIZE-12)/4) #define SERVER_PORT 9876 -#define CLIENT_PORT 9879 #define STR(x) #x -#define PARAMETER_SERVER localhost -typedef struct{ - int layer; - int wob; - int offset; - float data[NUMFLOATS]; -} message; - -int socket_setup(int port) +int socket_setup(int server) { - static int fd = 0; /* our socket */ - if(fd) return fd; - struct sockaddr_in myaddr; /* our address */ + int fd = 0; /* our socket */ + struct sockaddr_in me; /* our address */ /* create a UDP socket */ - if ((fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - perror("cannot create socket\n"); - fd=0; - return 0; + if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + error("cannot create socket"); } /* bind the socket to any valid IP address and a specific port */ + if (server == 1){ + bzero((char *) &me, sizeof(me)); + me.sin_family = AF_INET; + me.sin_addr.s_addr = htonl(INADDR_ANY); + me.sin_port = htons(SERVER_PORT); - memset((char *)&myaddr, 0, sizeof(myaddr)); - myaddr.sin_family = AF_INET; - myaddr.sin_addr.s_addr = htonl(INADDR_ANY); - myaddr.sin_port = htons(port); - - if (bind(fd, (struct sockaddr *)&myaddr, sizeof(myaddr)) < 0) { - perror("bind failed"); - fd=0; - return 0; + if (bind(fd, (struct sockaddr *)&me, sizeof(me)) < 0) { + error("bind failed"); + } } + return fd; } +typedef struct{ + int fd; + int *counter; + network net; +} connection_info; + +void read_all(int fd, char *buffer, size_t bytes) +{ + size_t n = 0; + while(n < bytes){ + int next = read(fd, buffer + n, bytes-n); + if(next < 0) error("read failed"); + n += next; + } +} + +void write_all(int fd, char *buffer, size_t bytes) +{ + size_t n = 0; + while(n < bytes){ + int next = write(fd, buffer + n, bytes-n); + if(next < 0) error("write failed"); + n += next; + } +} + +void read_and_add_into(int fd, float *a, int n) +{ + float *buff = calloc(n, sizeof(float)); + read_all(fd, (char*) buff, n*sizeof(float)); + axpy_cpu(n, 1, buff, 1, a, 1); + free(buff); +} + +void handle_connection(void *pointer) +{ + printf("New Connection\n"); + connection_info info = *(connection_info *) pointer; + int fd = info.fd; + network net = info.net; + ++*(info.counter); + int i; + for(i = 0; i < net.n; ++i){ + if(net.types[i] == CONVOLUTIONAL){ + convolutional_layer layer = *(convolutional_layer *) net.layers[i]; + + read_and_add_into(fd, layer.bias_updates, layer.n); + int num = layer.n*layer.c*layer.size*layer.size; + read_and_add_into(fd, layer.filter_updates, num); + } + if(net.types[i] == CONNECTED){ + connected_layer layer = *(connected_layer *) net.layers[i]; + + read_and_add_into(fd, layer.bias_updates, layer.outputs); + read_and_add_into(fd, layer.weight_updates, layer.inputs*layer.outputs); + } + } + for(i = 0; i < net.n; ++i){ + if(net.types[i] == CONVOLUTIONAL){ + convolutional_layer layer = *(convolutional_layer *) net.layers[i]; + update_convolutional_layer(layer); + + write_all(fd, (char*) layer.biases, layer.n*sizeof(float)); + int num = layer.n*layer.c*layer.size*layer.size; + write_all(fd, (char*) layer.filters, num*sizeof(float)); + } + if(net.types[i] == CONNECTED){ + connected_layer layer = *(connected_layer *) net.layers[i]; + update_connected_layer(layer); + write_all(fd, (char *)layer.biases, layer.outputs*sizeof(float)); + write_all(fd, (char *)layer.weights, layer.outputs*layer.inputs*sizeof(float)); + } + } + printf("Received updates\n"); + close(fd); +} + void server_update(network net) { - int fd = socket_setup(SERVER_PORT); - struct sockaddr_in remaddr; /* remote address */ - socklen_t addrlen = sizeof(remaddr); /* length of addresses */ - int recvlen; /* # bytes received */ - unsigned char buf[MESSAGESIZE]; /* receive buffer */ - message m; - - int count = 0; + int fd = socket_setup(1); + int counter = 0; + listen(fd, 10); + struct sockaddr_in client; /* remote address */ + socklen_t client_size = sizeof(client); /* length of addresses */ + connection_info info; + info.net = net; + info.counter = &counter; while(1){ - recvlen = recvfrom(fd, buf, MESSAGESIZE, 0, (struct sockaddr *)&remaddr, &addrlen); - memcpy(&m, buf, recvlen); - //printf("received %d bytes\n", recvlen); - //printf("layer %d wob %d offset %d\n", m.layer, m.wob, m.offset); - ++count; - if(count % 100 == 0) printf("%d\n", count); + pthread_t worker; + int connection = accept(fd, (struct sockaddr *) &client, &client_size); + info.fd = connection; + pthread_create(&worker, NULL, (void *) &handle_connection, &info); } - //printf("%s\n", buf); } -void client_update(network net) +void client_update(network net, char *address) { - int fd = socket_setup(CLIENT_PORT); - struct hostent *hp; /* host information */ - struct sockaddr_in servaddr; /* server address */ - printf("%ld %ld\n", sizeof(message), MESSAGESIZE); - char *my_message = "this is a test message"; + int fd = socket_setup(0); - unsigned char buf[MESSAGESIZE]; - message m; + struct hostent *hp; /* host information */ + struct sockaddr_in server; /* server address */ /* fill in the server's address and data */ - memset((char*)&servaddr, 0, sizeof(servaddr)); - servaddr.sin_family = AF_INET; - servaddr.sin_port = htons(SERVER_PORT); + bzero((char*)&server, sizeof(server)); + server.sin_family = AF_INET; + server.sin_port = htons(SERVER_PORT); /* look up the address of the server given its name */ - hp = gethostbyname("localhost"); + hp = gethostbyname(address); if (!hp) { + perror("no such host"); fprintf(stderr, "could not obtain address of %s\n", "localhost"); } /* put the host's address into the server address structure */ - memcpy((void *)&servaddr.sin_addr, hp->h_addr_list[0], hp->h_length); + memcpy((void *)&server.sin_addr, hp->h_addr_list[0], hp->h_length); + if (connect(fd, (struct sockaddr *) &server, sizeof(server)) < 0) { + error("error connecting"); + } /* send a message to the server */ - int i, j, k; + int i; for(i = 0; i < net.n; ++i){ + if(net.types[i] == CONVOLUTIONAL){ + convolutional_layer layer = *(convolutional_layer *) net.layers[i]; + write_all(fd, (char*) layer.bias_updates, layer.n*sizeof(float)); + int num = layer.n*layer.c*layer.size*layer.size; + write_all(fd, (char*) layer.filter_updates, num*sizeof(float)); + memset(layer.bias_updates, 0, layer.n*sizeof(float)); + memset(layer.filter_updates, 0, num*sizeof(float)); + } if(net.types[i] == CONNECTED){ - connected_layer *layer = (connected_layer *) net.layers[i]; - m.layer = i; - m.wob = 0; - for(j = 0; j < layer->outputs; j += NUMFLOATS){ - m.offset = j; - - int num = layer->outputs - j; - if(NUMFLOATS < num) num = NUMFLOATS; - - memcpy(m.data, &layer->bias_updates[j], num*sizeof(float)); - memcpy(buf, &m, MESSAGESIZE); - - if (sendto(fd, buf, MESSAGESIZE, 0, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { - perror("sendto failed"); - } - } - m.wob = 1; - for(j = 0; j < layer->outputs*layer->inputs; j += NUMFLOATS){ - m.offset = j; - - int num = layer->outputs*layer->inputs - j; - if(NUMFLOATS < num) num = NUMFLOATS; - - memcpy(m.data, &layer->weight_updates[j], num*sizeof(float)); - memcpy(buf, &m, MESSAGESIZE); - - if (sendto(fd, buf, MESSAGESIZE, 0, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { - perror("sendto failed"); - } - } + connected_layer layer = *(connected_layer *) net.layers[i]; + write_all(fd, (char *)layer.bias_updates, layer.outputs*sizeof(float)); + write_all(fd, (char *)layer.weight_updates, layer.outputs*layer.inputs*sizeof(float)); + memset(layer.bias_updates, 0, layer.outputs*sizeof(float)); + memset(layer.weight_updates, 0, layer.inputs*layer.outputs*sizeof(float)); } } + + for(i = 0; i < net.n; ++i){ + if(net.types[i] == CONVOLUTIONAL){ + convolutional_layer layer = *(convolutional_layer *) net.layers[i]; + + read_all(fd, (char*) layer.biases, layer.n*sizeof(float)); + int num = layer.n*layer.c*layer.size*layer.size; + read_all(fd, (char*) layer.filters, num*sizeof(float)); + + push_convolutional_layer(layer); + } + if(net.types[i] == CONNECTED){ + connected_layer layer = *(connected_layer *) net.layers[i]; + + read_all(fd, (char *)layer.biases, layer.outputs*sizeof(float)); + read_all(fd, (char *)layer.weights, layer.outputs*layer.inputs*sizeof(float)); + + push_connected_layer(layer); + } + } + close(fd); } diff --git a/src/server.h b/src/server.h index 3d8a46ba..eb9bf281 100644 --- a/src/server.h +++ b/src/server.h @@ -1,4 +1,4 @@ #include "network.h" +void client_update(network net, char *address); void server_update(network net); -void client_update(network net); diff --git a/src/utils.c b/src/utils.c index bba6218e..20cde393 100644 --- a/src/utils.c +++ b/src/utils.c @@ -48,7 +48,8 @@ void top_k(float *a, int n, int k, int *index) void error(char *s) { - fprintf(stderr, "Error: %s\n", s); + perror(s); + //fprintf(stderr, "Error: %s\n", s); exit(0); }