11.7. アトミック型のアルゴリズム

アトミック型のヒストグラム集計アルゴリズムは、Javaで並行・並列処理に慣れている読者には馴染みの手法となります。

各ワークグループで256要素のビン(カテゴリ)を保持するローカルメモリの空間を割り当て、そこに複数のスレッドから同時アクセスが来ても整合性がとれるように、アトミック関数を使って処理をします。

サンプルコードではこれを以下のように記述しています。

for(int i = 0; i < BIN_SIZE; ++i)
{
   uchar value = data[group_id * local_size * BIN_SIZE + i * local_size + local_id];
   atomic_inc(&shared_local_memory[value]);
}

複数の並列するスレッド(ワークアイテム)がshared_local_memory変数をアクセスしますが、atomic_inc関数によってアトミックな加算処理が行われ正しい集計がとれます。

HistogramAtomicTest.java. 

package com.book.jocl.histogram;

import static org.jocl.CL.*;

import java.io.File;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Paths;
import java.util.Scanner;

import org.jocl.CL;
import org.jocl.Pointer;
import org.jocl.Sizeof;
import org.jocl.cl_command_queue;
import org.jocl.cl_context;
import org.jocl.cl_context_properties;
import org.jocl.cl_device_id;
import org.jocl.cl_kernel;
import org.jocl.cl_mem;
import org.jocl.cl_platform_id;
import org.jocl.cl_program;

public class HistogramAtomicTest {

        private static final String KERNEL_PATH = "histogram_atomic.cl";
        private static final String KERNEL_FUNC = "histogram";
        private static final int BIN_SIZE = 256;
        private static final int LOCAL_SIZE = 128;
        private static final int FACTOR = 64; // group_size factor
        private static final int DATA_SIZE = FACTOR * LOCAL_SIZE * BIN_SIZE; // 4-vector conversion... 256*128*8=262144
        private static final int[] data = new int[DATA_SIZE];

    private static cl_context context;
    private static cl_command_queue queue;
    private static cl_program program;
    private static cl_kernel kernel;

        public static void main(String[] args) throws Exception {

                CL.setExceptionsEnabled(true);

                cl_platform_id[] platform = new cl_platform_id[1];
                cl_device_id[] device = new cl_device_id[1];
                int[] num_devices = new int[1];

                clGetPlatformIDs(1, platform, null);
                clGetDeviceIDs(platform[0], CL_DEVICE_TYPE_GPU, 1, device, num_devices);

                cl_context_properties props = new cl_context_properties();
                props.addProperty(CL_CONTEXT_PLATFORM, platform[0]);
                context = clCreateContext(props, 1, device, null, null, null);

                queue = clCreateCommandQueue(context, device[0], 0, null);

                StringBuffer sb  = new StringBuffer();
                URL resource = HistogramAtomicTest.class.getResource(KERNEL_PATH) ;
                String path = Paths.get(resource.toURI()).toFile().getAbsolutePath();
                Scanner sc = new Scanner(new File(path));
                while(sc.hasNext()) {
                        sb.append(sc.nextLine() + "\n");
                }
                sc.close();
                program = clCreateProgramWithSource(context, 1, new String[] {sb.toString()}, null, null);
                String option = "-Werror -DBIN_SIZE=256";
                clBuildProgram(program, 0, null, option, null, null);

                //DATA_SIZE = 256*128*64 = 2097152
                cl_mem data_mem = clCreateBuffer(context, CL_MEM_READ_ONLY | CL_MEM_USE_HOST_PTR,
                                Sizeof.cl_uint * DATA_SIZE, Pointer.to(data), null);

                cl_mem subHistogram = clCreateBuffer(context, CL_MEM_WRITE_ONLY | CL_MEM_ALLOC_HOST_PTR,
                                Sizeof.cl_uint*BIN_SIZE*64, null, null);

                kernel = clCreateKernel(program, KERNEL_FUNC, null);

                generateSample();

                clSetKernelArg(kernel, 0, Sizeof.cl_mem, Pointer.to(data_mem));
                clSetKernelArg(kernel, 1, Sizeof.cl_uint *BIN_SIZE, null);
                clSetKernelArg(kernel, 2, Sizeof.cl_mem, Pointer.to(subHistogram));
                // Workgroup数は8192(globalSize)/128(localSize) = 64
                long[] global_work_size = new long[]{64 * LOCAL_SIZE,1,1};
                long[] local_work_size = new long[]{LOCAL_SIZE,1,1};
                clEnqueueNDRangeKernel(queue, kernel, 1, null, global_work_size, local_work_size, 0, null, null);

                ByteBuffer output = clEnqueueMapBuffer(queue,
                                subHistogram,
                                CL_TRUE,
                                CL_MAP_WRITE,
                                0,
                                Sizeof.cl_int*BIN_SIZE*64,
                                0,
                                null,
                                null,
                                null);

                clEnqueueUnmapMemObject(queue, subHistogram, output, 0, null, null);
                clFinish(queue);

                int[] outputHistogram = new int[BIN_SIZE*64];
                int[] result = new int[BIN_SIZE];
                output.order(ByteOrder.LITTLE_ENDIAN);
                int i,j;
                for(i = 0; i < 64; ++i) {
                        for(j = 0; j < BIN_SIZE; ++j) {
                                outputHistogram[j*64+i] = output.getInt();
                        }
                }
                for(i = 0; i < 64; ++i) {
                        for(j = 0; j < BIN_SIZE; ++j) {
                                result[j] += outputHistogram[j*64+i];
                        }
                }
                for(i = 0; i < BIN_SIZE; ++i) {
                        System.out.printf("TOTAL[%d]: %d\n", i, result[i]);
                }

                clReleaseDevice(device[0]);
                clReleaseContext(context);
                clReleaseCommandQueue(queue);
                clReleaseKernel(kernel);
                clReleaseProgram(program);
        }

        private static void generateSample() {
                for(int i = 0; i < DATA_SIZE; i++) {
                        data[i] = (i%16);
                }
        }

}

histogram_atomic.cl. 

__kernel void histogram(
                __global const uint* data,
                __local uint* shared_local_memory,
        __global uint* buckets)
{

     size_t local_id   = get_local_id(0);
     size_t group_id   = get_group_id(0);
     size_t local_size = get_local_size(0);

     shared_local_memory[local_id] = 0;
     uint4* input = (uint4*) shared_local_memory;
     for(int i = 0; i < 64; i++)
         input[i] = 0;
     barrier(CLK_LOCAL_MEM_FENCE);

     for(int i = 0; i < BIN_SIZE; ++i)
     {
       uchar value = data[group_id * local_size * BIN_SIZE + i * local_size + local_id];
       atomic_inc(&shared_local_memory[value]);
     }
     barrier(CLK_LOCAL_MEM_FENCE);

     // 0...127
     if(local_id == 0) {
             for(int i = 0; i < BIN_SIZE; i++) {
               buckets[group_id * BIN_SIZE  + i] = shared_local_memory[i]; // 0..63 * 256 + 0...255
             }
     }
}

Copyright 2018-2019, by Masaki Komatsu