From cb1f33c6ae840e8dc0f43518daf76e6ed01034f0 Mon Sep 17 00:00:00 2001 From: Joseph Redmon Date: Mon, 8 Dec 2014 11:48:57 -0800 Subject: [PATCH] Fixed race condition in server --- src/activations.c | 2 -- src/axpy.c | 3 --- src/cnn.c | 37 +++++++++++++++++++++++++++++++++++-- src/col2im.c | 1 - src/convolutional_layer.c | 2 -- src/cost_layer.c | 1 - src/gemm.c | 3 --- src/im2col.c | 2 -- src/maxpool_layer.c | 2 -- src/opencl.c | 15 ++++----------- src/opencl.h | 2 +- src/server.c | 26 ++++++++++++++++---------- src/softmax_layer.c | 1 - 13 files changed, 56 insertions(+), 41 deletions(-) diff --git a/src/activations.c b/src/activations.c index db68101d..4232efa7 100644 --- a/src/activations.c +++ b/src/activations.c @@ -116,7 +116,6 @@ cl_kernel get_activation_kernel() void activate_array_ongpu(cl_mem x, int n, ACTIVATION a) { - cl_setup(); cl_kernel kernel = get_activation_kernel(); cl_command_queue queue = cl.queue; @@ -145,7 +144,6 @@ cl_kernel get_gradient_kernel() void gradient_array_ongpu(cl_mem x, int n, ACTIVATION a, cl_mem delta) { - cl_setup(); cl_kernel kernel = get_gradient_kernel(); cl_command_queue queue = cl.queue; diff --git a/src/axpy.c b/src/axpy.c index 21293b36..857579ca 100644 --- a/src/axpy.c +++ b/src/axpy.c @@ -70,7 +70,6 @@ void axpy_ongpu(int N, float ALPHA, cl_mem X, int INCX, cl_mem Y, int INCY) void axpy_ongpu_offset(int N, float ALPHA, cl_mem X, int OFFX, int INCX, cl_mem Y, int OFFY, int INCY) { - cl_setup(); cl_kernel kernel = get_axpy_kernel(); cl_command_queue queue = cl.queue; @@ -97,7 +96,6 @@ void copy_ongpu(int N, cl_mem X, int INCX, cl_mem Y, int INCY) } void copy_ongpu_offset(int N, cl_mem X, int OFFX, int INCX, cl_mem Y, int OFFY, int INCY) { - cl_setup(); cl_kernel kernel = get_copy_kernel(); cl_command_queue queue = cl.queue; @@ -118,7 +116,6 @@ void copy_ongpu_offset(int N, cl_mem X, int OFFX, int INCX, cl_mem Y, int OFFY, } void scal_ongpu(int N, float ALPHA, cl_mem X, int INCX) { - cl_setup(); cl_kernel kernel = get_scal_kernel(); cl_command_queue queue = cl.queue; diff --git a/src/cnn.c b/src/cnn.c index 8279c2f8..f40e9a99 100644 --- a/src/cnn.c +++ b/src/cnn.c @@ -666,6 +666,28 @@ void train_nist() } } +void train_nist_distributed(char *address) +{ + srand(time(0)); + network net = parse_network_cfg("cfg/nist.client"); + data train = load_categorical_data_csv("data/mnist/mnist_train.csv", 0, 10); + //data test = load_categorical_data_csv("data/mnist/mnist_test.csv",0,10); + normalize_data_rows(train); + //normalize_data_rows(test); + int count = 0; + int iters = 50000/net.batch; + iters = 1000/net.batch + 1; + while(++count <= 2000){ + clock_t start = clock(), end; + float loss = train_network_sgd_gpu(net, train, iters); + client_update(net, address); + end = clock(); + //float test_acc = network_accuracy_gpu(net, test); + //float test_acc = 0; + printf("%d: Loss: %f, Time: %lf seconds\n", count, loss, (float)(end-start)/CLOCKS_PER_SEC); + } +} + void test_ensemble() { int i; @@ -875,7 +897,7 @@ void test_correct_alexnet() void run_server() { srand(time(0)); - network net = parse_network_cfg("cfg/alexnet.server"); + network net = parse_network_cfg("cfg/nist.server"); server_update(net); } void test_client() @@ -891,12 +913,23 @@ void test_client() printf("Transfered: %lf seconds\n", sec(clock()-time)); } +int find_int_arg(int argc, char* argv[], char *arg) +{ + int i; + for(i = 0; i < argc-1; ++i) if(0==strcmp(argv[i], arg)) return atoi(argv[i+1]); + return 0; +} + int main(int argc, char *argv[]) { if(argc < 2){ fprintf(stderr, "usage: %s \n", argv[0]); return 0; } + int index = find_int_arg(argc, argv, "-i"); + #ifdef GPU + cl_setup(index); + #endif if(0==strcmp(argv[1], "train")) train_imagenet(); else if(0==strcmp(argv[1], "detection")) train_detection_net(); else if(0==strcmp(argv[1], "asirra")) train_asirra(); @@ -912,7 +945,7 @@ int main(int argc, char *argv[]) fprintf(stderr, "usage: %s \n", argv[0]); return 0; } - else if(0==strcmp(argv[1], "client")) train_imagenet_distributed(argv[2]); + else if(0==strcmp(argv[1], "client")) train_nist_distributed(argv[2]); else if(0==strcmp(argv[1], "visualize")) test_visualize(argv[2]); else if(0==strcmp(argv[1], "valid")) validate_imagenet(argv[2]); fprintf(stderr, "Success!\n"); diff --git a/src/col2im.c b/src/col2im.c index c11f4535..f986071e 100644 --- a/src/col2im.c +++ b/src/col2im.c @@ -61,7 +61,6 @@ void col2im_ongpu(cl_mem data_col, int offset, int channels, int height, int width, int ksize, int stride, int pad, cl_mem data_im) { - cl_setup(); cl_kernel kernel = get_col2im_kernel(); cl_command_queue queue = cl.queue; diff --git a/src/convolutional_layer.c b/src/convolutional_layer.c index 5b4e0b5b..bb2135fa 100644 --- a/src/convolutional_layer.c +++ b/src/convolutional_layer.c @@ -280,7 +280,6 @@ void learn_bias_convolutional_layer_ongpu(convolutional_layer layer) { int size = convolutional_out_height(layer) * convolutional_out_width(layer); - cl_setup(); cl_kernel kernel = get_convolutional_learn_bias_kernel(); cl_command_queue queue = cl.queue; @@ -315,7 +314,6 @@ void bias_output_gpu(const convolutional_layer layer) int out_w = convolutional_out_width(layer); int size = out_h*out_w; - cl_setup(); cl_kernel kernel = get_convolutional_bias_kernel(); cl_command_queue queue = cl.queue; diff --git a/src/cost_layer.c b/src/cost_layer.c index 08d3bb5a..69519565 100644 --- a/src/cost_layer.c +++ b/src/cost_layer.c @@ -75,7 +75,6 @@ cl_kernel get_mask_kernel() void mask_ongpu(int n, cl_mem x, cl_mem mask, int mod) { - cl_setup(); cl_kernel kernel = get_mask_kernel(); cl_command_queue queue = cl.queue; diff --git a/src/gemm.c b/src/gemm.c index afeb46a4..d1782b1a 100644 --- a/src/gemm.c +++ b/src/gemm.c @@ -178,14 +178,12 @@ void gemm_ongpu_offset(int TA, int TB, int M, int N, int K, float ALPHA, cl_mem C_gpu, int c_off, int ldc) { #ifdef CLBLAS - cl_setup(); cl_command_queue queue = cl.queue; cl_event event; cl.error = clblasSgemm(clblasRowMajor, TA?clblasTrans:clblasNoTrans, TB?clblasTrans:clblasNoTrans,M, N, K,ALPHA, A_gpu, a_off, lda,B_gpu, b_off, ldb,BETA, C_gpu, c_off, ldc,1, &queue, 0, NULL, &event); check_error(cl); #else //printf("gpu: %d %d %d %d %d\n",TA, TB, M, N, K); - cl_setup(); cl_kernel gemm_kernel = get_gemm_kernel(); if(!TA && !TB) gemm_kernel = get_gemm_nn_kernel(); if(!TA && TB) gemm_kernel = get_gemm_nt_kernel(); @@ -225,7 +223,6 @@ void gemm_gpu(int TA, int TB, int M, int N, int K, float ALPHA, float BETA, float *C, int ldc) { - cl_setup(); cl_context context = cl.context; cl_command_queue queue = cl.queue; diff --git a/src/im2col.c b/src/im2col.c index 18e6c0e9..2af13e9d 100644 --- a/src/im2col.c +++ b/src/im2col.c @@ -75,7 +75,6 @@ void im2col_ongpu(cl_mem data_im, int offset, int channels, int height, int width, int ksize, int stride, int pad, cl_mem data_col) { - cl_setup(); int height_col = (height - ksize) / stride + 1; int width_col = (width - ksize) / stride + 1; @@ -113,7 +112,6 @@ void im2col_ongpu(cl_mem data_im, int offset, int channels, int height, int width, int ksize, int stride, int pad, float *data_col) { - cl_setup(); cl_context context = cl.context; cl_command_queue queue = cl.queue; diff --git a/src/maxpool_layer.c b/src/maxpool_layer.c index df190408..c05e9393 100644 --- a/src/maxpool_layer.c +++ b/src/maxpool_layer.c @@ -115,7 +115,6 @@ void forward_maxpool_layer_gpu(maxpool_layer layer, cl_mem input) int h = (layer.h-1)/layer.stride + 1; int w = (layer.w-1)/layer.stride + 1; int c = layer.c; - cl_setup(); cl_kernel kernel = get_forward_kernel(); cl_command_queue queue = cl.queue; @@ -149,7 +148,6 @@ cl_kernel get_backward_kernel() void backward_maxpool_layer_gpu(maxpool_layer layer, cl_mem delta) { - cl_setup(); cl_kernel kernel = get_backward_kernel(); cl_command_queue queue = cl.queue; diff --git a/src/opencl.c b/src/opencl.c index 55fb56c7..994b8d6b 100644 --- a/src/opencl.c +++ b/src/opencl.c @@ -27,7 +27,7 @@ void check_error(cl_info info) #define MAX_DEVICES 10 -cl_info cl_init() +cl_info cl_init(int index) { cl_info info; info.initialized = 0; @@ -87,8 +87,7 @@ cl_info cl_init() printf(" DEVICE_MAX_WORK_ITEM_SIZES = %u / %u / %u \n", (unsigned int)workitem_size[0], (unsigned int)workitem_size[1], (unsigned int)workitem_size[2]); } - int index = getpid()%num_devices; - index = 1; + index = index%num_devices; printf("%d rand, %d devices, %d index\n", getpid(), num_devices, index); info.device = devices[index]; fprintf(stderr, "Found %d device(s)\n", num_devices); @@ -135,17 +134,16 @@ cl_program cl_fprog(char *filename, char *options, cl_info info) return prog; } -void cl_setup() +void cl_setup(int index) { if(!cl.initialized){ printf("initializing\n"); - cl = cl_init(); + cl = cl_init(index); } } cl_kernel get_kernel(char *filename, char *kernelname, char *options) { - cl_setup(); cl_program prog = cl_fprog(filename, options, cl); cl_kernel kernel=clCreateKernel(prog, kernelname, &cl.error); check_error(cl); @@ -154,7 +152,6 @@ cl_kernel get_kernel(char *filename, char *kernelname, char *options) void cl_read_array(cl_mem mem, float *x, int n) { - cl_setup(); cl.error = clEnqueueReadBuffer(cl.queue, mem, CL_TRUE, 0, sizeof(float)*n,x,0,0,0); check_error(cl); } @@ -171,14 +168,12 @@ float cl_checksum(cl_mem mem, int n) void cl_write_array(cl_mem mem, float *x, int n) { - cl_setup(); cl.error = clEnqueueWriteBuffer(cl.queue, mem, CL_TRUE, 0,sizeof(float)*n,x,0,0,0); check_error(cl); } void cl_copy_array(cl_mem src, cl_mem dst, int n) { - cl_setup(); cl.error = clEnqueueCopyBuffer(cl.queue, src, dst, 0, 0, sizeof(float)*n,0,0,0); check_error(cl); } @@ -196,7 +191,6 @@ cl_mem cl_sub_array(cl_mem src, int offset, int size) cl_mem cl_make_array(float *x, int n) { - cl_setup(); cl_mem mem = clCreateBuffer(cl.context, CL_MEM_READ_WRITE|CL_MEM_COPY_HOST_PTR, sizeof(float)*n, x, &cl.error); @@ -207,7 +201,6 @@ cl_mem cl_make_array(float *x, int n) cl_mem cl_make_int_array(int *x, int n) { - cl_setup(); cl_mem mem = clCreateBuffer(cl.context, CL_MEM_READ_WRITE|CL_MEM_COPY_HOST_PTR, sizeof(int)*n, x, &cl.error); diff --git a/src/opencl.h b/src/opencl.h index a3985a74..f93f9f37 100644 --- a/src/opencl.h +++ b/src/opencl.h @@ -19,7 +19,7 @@ typedef struct { extern cl_info cl; -void cl_setup(); +void cl_setup(int index); void check_error(cl_info info); cl_kernel get_kernel(char *filename, char *kernelname, char *options); void cl_read_array(cl_mem mem, float *x, int n); diff --git a/src/server.c b/src/server.c index 657ea7ce..e9270114 100644 --- a/src/server.c +++ b/src/server.c @@ -51,20 +51,22 @@ typedef struct{ void read_all(int fd, char *buffer, size_t bytes) { + //printf("Want %d\n", bytes); size_t n = 0; while(n < bytes){ int next = read(fd, buffer + n, bytes-n); - if(next < 0) error("read failed"); + if(next <= 0) error("read failed"); n += next; } } void write_all(int fd, char *buffer, size_t bytes) { + //printf("Writ %d\n", bytes); size_t n = 0; while(n < bytes){ int next = write(fd, buffer + n, bytes-n); - if(next < 0) error("write failed"); + if(next <= 0) error("write failed"); n += next; } } @@ -79,8 +81,9 @@ void read_and_add_into(int fd, float *a, int n) void handle_connection(void *pointer) { - printf("New Connection\n"); connection_info info = *(connection_info *) pointer; + free(pointer); + printf("New Connection\n"); int fd = info.fd; network net = info.net; int i; @@ -117,8 +120,6 @@ void handle_connection(void *pointer) } printf("Received updates\n"); close(fd); - ++*(info.counter); - if(*(info.counter)%10==0) save_network(net, "/home/pjreddie/imagenet_backup/alexnet.part"); } void server_update(network net) @@ -128,14 +129,16 @@ void server_update(network net) listen(fd, 10); struct sockaddr_in client; /* remote address */ socklen_t client_size = sizeof(client); /* length of addresses */ - connection_info info; - info.net = net; - info.counter = &counter; while(1){ + connection_info *info = calloc(1, sizeof(connection_info)); + info->net = net; + info->counter = &counter; pthread_t worker; int connection = accept(fd, (struct sockaddr *) &client, &client_size); - info.fd = connection; - pthread_create(&worker, NULL, (void *) &handle_connection, &info); + info->fd = connection; + pthread_create(&worker, NULL, (void *) &handle_connection, info); + ++counter; + if(counter%1000==0) save_network(net, "cfg/nist.part"); } } @@ -166,6 +169,7 @@ void client_update(network net, char *address) /* send a message to the server */ int i; + //printf("Sending\n"); for(i = 0; i < net.n; ++i){ if(net.types[i] == CONVOLUTIONAL){ convolutional_layer layer = *(convolutional_layer *) net.layers[i]; @@ -183,6 +187,7 @@ void client_update(network net, char *address) memset(layer.weight_updates, 0, layer.inputs*layer.outputs*sizeof(float)); } } + //printf("Sent\n"); for(i = 0; i < net.n; ++i){ if(net.types[i] == CONVOLUTIONAL){ @@ -203,5 +208,6 @@ void client_update(network net, char *address) push_connected_layer(layer); } } + //printf("Updated\n"); close(fd); } diff --git a/src/softmax_layer.c b/src/softmax_layer.c index abd9abfe..ffc028f7 100644 --- a/src/softmax_layer.c +++ b/src/softmax_layer.c @@ -69,7 +69,6 @@ cl_kernel get_softmax_forward_kernel() void forward_softmax_layer_gpu(const softmax_layer layer, cl_mem input) { - cl_setup(); cl_kernel kernel = get_softmax_forward_kernel(); cl_command_queue queue = cl.queue;