czwartek, grudnia 18, 2014

How to copy EMS datastore for interDC transport? Ought to be _fast_.

#ifndef _GNU_SOURCE
#define _GNU_SOURCE 
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <malloc.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <dlfcn.h>
#include <execinfo.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <errno.h>
#include <time.h>
#include <sys/syscall.h>
#include <inttypes.h>
#ifdef AIO_VIA_SYSCALL
#include <linux/aio_abi.h>
#else
#include <libaio.h>
#endif
#include <list>

#define SYSOUTF(msg, params...) do { char buff[1024]; memset(buff, 0, 1024); snprintf(buff, 1024, msg, params); write(1,buff,strlen(buff)); } while(0)
#define SYSOUT(msg) write(1, msg, strlen(msg))

#define ONE_MB (1024*1024)
#define ONE_GB (1024L*ONE_MB)

#define STRATEGY_MMAP_WRITE  0
#define STRATEGY_MMAP_BOTH   1
#define STRATEGY_SERIAL      2
#define STRATEGY_SENDFILE    3
#define STRATEGY_AIO         4

static const char* STRATEGY_DESC[] = { "MMAP+WRITE", "MMAPx2+MEMCPY", "READ+WRITE", "SENDFILE", "AIO" };

static struct timespec __ts, __ts_res;
static clockid_t __clock = CLOCK_REALTIME;

static int strategy = STRATEGY_MMAP_WRITE;
static int fd_in = -1, fd_out = -1, err_in = 0, err_out = 0;
static void *addr = NULL, *addr2 = NULL;
static long size_in = 0, blk_size = 0;
static int warn_mem_params_not_optimal = 1;

#define TIMER_START() do { timer_start(&__ts,__clock); } while(0)
#define TIMER_STOP() do { timer_stop(&__ts,__clock); clock_getres(__clock,&__ts_res); } while(0)
#define PRINT_WALL_TIME() do { long msec = __ts.tv_nsec/1000000L; SYSOUTF("[INFO] WALL TIME: %ldsec %ldmsec %ldusec +- %ldnsec\n",(long)__ts.tv_sec,msec,(__ts.tv_nsec - msec*1000000L)/1000L,__ts_res.tv_nsec); } while(0)

static inline int timer_start(struct timespec *ts, clockid_t clid) {
    int res = clock_gettime(clid,ts);
 if (res!=0) {
  int e = errno;
  SYSOUTF("Failed timer_start due to '%s'\r\n", strerror(e));
 }
    return res;
}

static inline int timer_stop(struct timespec *ts, clockid_t clid) {
    struct timespec ts1;
    int res = clock_gettime(clid,&ts1);
    if (res!=0) {
        int e = errno;
        SYSOUTF("Failed timer_stop due to '%s'\r\n", strerror(e));
  return res;
    }
    ts->tv_sec = ts1.tv_sec - ts->tv_sec;
    ts->tv_nsec = ts1.tv_nsec - ts->tv_nsec;
    if (ts->tv_nsec < 0) {
  ts->tv_nsec += 1000000000L;
        ts->tv_sec--;
    }
    return 0;
}

long get_rmem_limit() {
    struct rlimit rlimit;
    int res = getrlimit(RLIMIT_MEMLOCK, &rlimit);
    if (res == 0) {
  return (long)rlimit.rlim_max;
    }
    return -1L;
}

int open_source_file(char *path) {
    int res = -1;
    int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0;
    do {
  res = open(path, O_LARGEFILE | O_RDONLY | extra_flags);
    }
    while (res == -1 && errno == EINTR);    
    return res;
}

int open_dest_file(char *path) {
    int res = -1;
    int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0;
    do {
  res = open(path, O_CREAT | O_LARGEFILE | O_RDWR | extra_flags, 0666);
    }
    while (res == -1 && errno == EINTR);
    return res;
}

long get_file_size(int fd, long *block_size) {
    struct stat stat;
    TIMER_START();
    int res = fstat(fd, &stat);
    if (res == 0) {
  if (block_size != NULL)
   *block_size = stat.st_blksize;
  return stat.st_size;
    }
    return -1L;
}

int allocate_file(int fd, long size) {
    return fallocate(fd, 0, 0, size);
}

void preload_file_advise(int fd, long size, off_t base_offset) {
 int res = posix_fadvise(fd, base_offset, size, POSIX_FADV_WILLNEED | POSIX_FADV_SEQUENTIAL);
 if (res != 0) {
  int e = errno;
  SYSOUTF("[WARN] POSIX fadvise returned error '%s'\r\n", strerror(e));
 }
}

void* mmap_file(int fd, long size, int mode, off_t base_offset = 0) {
    preload_file_advise(fd, size, base_offset);
    int PERF_FLAGS[] = { MAP_HUGETLB | MAP_LOCKED, MAP_HUGETLB, MAP_LOCKED, 0 };
    void *addr = NULL;
    int i = 0;
    for (; i < 4; i++) {
  addr = mmap(NULL, size, mode == 0 ? PROT_READ : PROT_WRITE, MAP_SHARED | MAP_POPULATE | PERF_FLAGS[i], fd, base_offset);
  if (addr!= (void*)-1)
   break;
    }
    if (addr == (void*)-1)
  return NULL;
    if (i!=0 && warn_mem_params_not_optimal) {
  SYSOUT("[WARN] Memory access is not optimal (no huge TLB & pages locked into RAM)\r\n");
  warn_mem_params_not_optimal = 0;
    }
    int res = posix_madvise(addr, size, POSIX_MADV_WILLNEED | POSIX_MADV_SEQUENTIAL);
    if (res != 0) {
  int e = errno;
  SYSOUTF("[WARN] POSIX madvise returned error '%s'\r\n", strerror(e));
    }
    res = mlock(addr, size);
    if (res != 0) {
     int e = errno;
     SYSOUTF("[WARN] POSIX mlock returned error '%s'\r\n", strerror(e));
    }
    return addr;
}

int write_file(int fd, void *buff, long len, long base_off = 0) {
    long to_be_written = len;
    long off = 0;
    do {
  long res = pwrite(fd, buff, to_be_written, off + base_off);
  if (res > 0) {
   to_be_written -= res;
   off += res;
   if (to_be_written == 0L)
    break;
  }
  else if (res < 0 && errno != EINTR)
   break;
 }
    while (errno == EINTR);
    TIMER_STOP();
    if (errno == 0)
  fdatasync(fd); 
    return to_be_written == 0 ? 0 : errno;
}    

#define CLEANUP_EXIT(code) { if (addr) munmap(addr, size_in); if (addr2) munmap(addr2, size_in); if (fd_in!=-1) close(fd_in); if (fd_out!=-1) close(fd_out); exit(code); } while(0)
#define CLEANUP_MMAP(size) { if (addr) { munmap(addr, size); addr = NULL; } if (addr2) { munmap(addr2, size); addr2 = NULL; } } while(0)

void open_files(char **argv) {
    long mlock_max = get_rmem_limit();
    SYSOUTF("[INFO] RLIMIT_MEMLOCK is %ldKB,\r\n", mlock_max/1024);
    if (mlock_max < 128*1024*1024) {
  SYSOUTF("[WARN] Please adjust memlock in /etc/security/limits.conf to something big like %d (KB), which is 32GB.\r\n", 32*ONE_MB);
    }
    fd_in = open_source_file(argv[1]); err_in = errno;
    if (fd_in == -1) {
  SYSOUTF("[ERROR] Cannot open source file %s due to error '%s'\r\n", argv[1], strerror(err_in));
  CLEANUP_EXIT(-1);
    }
    fd_out = open_dest_file(argv[2]); err_out = errno;
    if (fd_out == -1) {
  SYSOUTF("[ERROR] Cannot open destination file %s due to error '%s'\r\n", argv[2], strerror(err_out));
  CLEANUP_EXIT(-2);
    }    
    size_in = get_file_size(fd_in, &blk_size); err_in = errno;
    if (size_in == -1L) {
  SYSOUTF("[ERROR] Unable determine file size for %s due to error '%s'\r\n", argv[1], strerror(err_in));
  CLEANUP_EXIT(-3);
    }
    SYSOUTF("[INFO] File size for %s is %ld, block size is %ld.\r\n", argv[1], size_in, blk_size);
    if (allocate_file(fd_out, size_in) != 0) {
  err_out = errno;
  SYSOUTF("[ERROR] Cannot allocate file %s due to error '%s'\r\n", argv[2], strerror(err_out));
  CLEANUP_EXIT(-4);
    }
}

#define MMAP_IN  0
#define MMAP_OUT 1

#ifndef MMAP_CHUNK_SIZE
#define MMAP_CHUNK_SIZE  (128*1024*1024)
#endif

void mmap_copy(char **argv) {
 for (long base = 0; base < size_in; base += MMAP_CHUNK_SIZE) {
  long size = size_in - base;
  if (size > MMAP_CHUNK_SIZE)
   size = MMAP_CHUNK_SIZE;
  else if (size <= 0)
   break;
   
  addr = mmap_file(fd_in, size, MMAP_IN, base);
  if (addr == NULL) {
   err_in = errno;
   SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[1], strerror(err_in));
   CLEANUP_EXIT(-5);
  }
  if (strategy == STRATEGY_MMAP_WRITE) {
   int res = write_file(fd_out, addr, size, base);
   if (res!=0) {
    SYSOUTF("[ERROR] Could not write file %s due to error '%s'\r\n", argv[2], strerror(res));
    CLEANUP_EXIT(-6);
   }
  }
  else if (strategy == STRATEGY_MMAP_BOTH) {
   addr2 = mmap_file(fd_out, size, MMAP_OUT, base);
   if (addr2 == NULL) {
    err_out = errno;
    SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[2], strerror(err_out));
    CLEANUP_EXIT(-7);
   }
   else {
    memcpy(addr2, addr, size);
    TIMER_STOP();
    if (msync(addr2, size, MS_SYNC) != 0) {
     err_out = errno;
     SYSOUTF("[ERROR] Cannot sync file %s due to error '%s'\r\n", argv[2], strerror(err_out));
     CLEANUP_EXIT(-8);
    }
   }
  }
  CLEANUP_MMAP(size);
 }
 TIMER_STOP();
}

int sendfile_copy() {
 err_out = 0;
 long to_write = size_in;
 long written_once = 0;
 while (to_write > 0) {
  written_once = sendfile(fd_out, fd_in, NULL, to_write);
  if (written_once == -1) {
   err_out = errno;
   if (err_out != EAGAIN) {
    SYSOUTF("[ERROR] Function sendfile is not supported '%s'\r\n", strerror(err_out));
    return -1;
   }
   else
    continue;
  }
  else {
   to_write -= written_once;
  }
 }
 if (to_write != 0) {
  SYSOUTF("[ERROR] Function sendfile written too small data: %ld vs %ld, error=%s\r\n", size_in - to_write, size_in, strerror(err_out));
  return -1;
 }
 TIMER_STOP();
 return 1;    
}    

#ifndef COPY_BLOCK_SIZE
#define COPY_BLOCK_SIZE 4096
#endif

void serial_copy() {
    char buff[COPY_BLOCK_SIZE];
    long in = 0;
    long out = 0;
    long cnt = 0;
    preload_file_advise(fd_in, size_in, 0);
    do {
  in=read(fd_in, (void*)buff, COPY_BLOCK_SIZE);
  if (in>0) {
   out = in;
   do {
    cnt = write(fd_out, buff, out);
    err_out = errno;
    if (cnt < 0 && err_out != EINTR) {
     SYSOUTF("[ERROR] Broken writing to output file '%s'\r\n", strerror(err_out));
     CLEANUP_EXIT(-10);
    }
    else if (cnt > 0) {
     out -= cnt;
    }
   }
   while (out > 0);
  }
  else {
   if (errno == 0) {
    if (in==0)
     continue;
    else
     break;
   }
   if (errno != EINTR) {
    err_in = errno;
    SYSOUTF("[ERROR] Broken reading of input file '%s'\r\n", strerror(err_in));
    CLEANUP_EXIT(-9); 
   }
   else
    continue;
  }
    }
    while (in > 0); 
 TIMER_STOP();
}

#define PG_SIZE 4096
#define AIO_MAX_NR 1024

#ifdef AIO_VIA_SYSCALL
inline int io_setup(unsigned nr, aio_context_t *ctxp) {
 return syscall(__NR_io_setup, nr, ctxp);
}
inline int io_destroy(aio_context_t ctx) {
 return syscall(__NR_io_destroy, ctx);
}
inline int io_submit(aio_context_t ctx, long nr, struct iocb **cbp) {
 return syscall(__NR_io_submit, nr, cbp);
}
inline int io_cancel(aio_context_t ctx, struct iocb *cb, struct io_event *result) {
 return syscall(__NR_io_cancel, cb, result);
}
inline int io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events, struct timespec *timeout) {
 return syscall(__NR_io_getevents, min_nr, nr, events, timeout);
}
inline void io_prep_pread(struct iocb *cb, int fd, void *buff, long size, long offset) {
 cb->aio_fildes = fd;
 cb->aio_lio_opcode = IOCB_CMD_PREAD;
 cb->aio_buf = (uint64_t)buff;
 cb->aio_offset = offset;
 cb->aio_nbytes = size;
}
#else
#define IOCB_CMD_PREAD  0
#define IOCB_CMD_PWRITE  1
#define IOCB_CMD_FSYNC  2
#define IOCB_CMD_FDSYNC  3
#define IOCB_CMD_NOOP  6
#define aio_context_t io_context_t
#endif

void aio_copy() {
 int chunks = (size_in + (PG_SIZE/2) - 1) / PG_SIZE; /* with padding */
 SYSOUTF("[INFO] AIO copy needs %d chunks\r\n", chunks);
 aio_context_t ctx = 0;
 struct iocb* cbs = (struct iocb*)malloc(AIO_MAX_NR * sizeof(struct iocb));
 struct iocb** cbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*));
 struct iocb** wcbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*));
 struct io_event* ioevents = (struct io_event*)malloc(AIO_MAX_NR * sizeof(struct io_event));

 if (cbs==NULL || cbps==NULL || wcbps==NULL || ioevents==NULL) {
  int ret = errno;
  SYSOUTF("[ERROR] Broken malloc '%s'\r\n", strerror(ret));
  CLEANUP_EXIT(-12);
 } 
 
 int ret = io_setup(AIO_MAX_NR, &ctx);
 if (ret < 0) {
  ret = errno;
  SYSOUTF("[ERROR] Broken io_setup for %d chunks '%s'\r\n", chunks, strerror(ret));
  CLEANUP_EXIT(-11); 
 }
 void *bigbuff = malloc(AIO_MAX_NR*PG_SIZE);
#define DELETE_MEM do { if (bigbuff!=NULL) free(bigbuff); free(cbs); free(cbps); free(wcbps); free(ioevents); io_destroy(ctx); } while(0)
 if (bigbuff == NULL) {
  ret = errno;
  SYSOUTF("[ERROR] Buffer allocation failed for input file: '%s'\r\n", strerror(ret));
  DELETE_MEM;
  CLEANUP_EXIT(-12); 
 }
 
 int rounds = (chunks + AIO_MAX_NR/2 - 1) / AIO_MAX_NR; /* with padding */
#define AIO_GROUP_TO_OFFSET(i) (long(i) * PG_SIZE * AIO_MAX_NR)
#define AIO_STEP_TO_OFFSET(i) (long(i) * PG_SIZE)
 for (int i=0; i < AIO_MAX_NR; i++) {
  memset(&cbs[i], 0, sizeof(struct iocb));
  cbps[i] = &cbs[i];
 }

 for (int group=0; group < rounds; group++) {
  int step = 0;
  for (; step < AIO_MAX_NR; step++) {
   long curr_off = AIO_GROUP_TO_OFFSET(group) + AIO_STEP_TO_OFFSET(step);
   long nbytes = size_in - curr_off;
   if (nbytes <= 0) /* we are inside padding */
    break;
   if (nbytes > PG_SIZE)
    nbytes = PG_SIZE;
 
   long pbuff = (long)bigbuff + AIO_STEP_TO_OFFSET(step);
   io_prep_pread(&cbs[step], fd_in, (void*)pbuff, nbytes, curr_off);
  }
  if (step == 0)
   break;
  int todo = io_submit(ctx, step, cbps);
  if (todo != step) {
   ret = errno;
   SYSOUTF("[ERROR] Broken io_submit, group=%d, queued=%d; '%s'\r\n", group, todo >= 0 ? todo : -1, todo < 0 ? strerror(ret) : "failure for given index");
   DELETE_MEM;
   CLEANUP_EXIT(-13); 
  }
  
  int wi = 0;
  while (todo > 0) {
   ret = io_getevents(ctx, 1, todo, ioevents, NULL);
   if (ret < 0) {
    ret = errno;
    SYSOUTF("[ERROR] Broken io_getevents '%s'\r\n", strerror(ret));
    DELETE_MEM;
    CLEANUP_EXIT(-14); 
   }
   else if (ret > 0) {
    todo -= ret;
    int to_write = 0;
    for (int i=0; i < ret; i++) {
     struct iocb *iocbp = (struct iocb*) ioevents[i].obj;
     if (iocbp->aio_lio_opcode == IOCB_CMD_PREAD) {
      iocbp->aio_fildes = fd_out;
      iocbp->aio_lio_opcode = IOCB_CMD_PWRITE;
      wcbps[wi++] = iocbp;
      to_write++;
     }
    }
    if (to_write > 0) {
     int e = io_submit(ctx, to_write, &wcbps[wi - to_write]); /* move along linearly */
     if (e != to_write) {
      ret = errno;
      SYSOUTF("[ERROR] Broken io_submit for write '%s'\r\n", e < 0 ? strerror(ret) : "failure for given index");
      DELETE_MEM;
      CLEANUP_EXIT(-15); 
     }  
     else
      todo += to_write;
    }
   }
  }
 }
 DELETE_MEM;
 TIMER_STOP();
}

int main(int argc, char **argv) {

 if (argc < 3) {
  SYSOUTF("Usage: %s SRC_FILE DST_FILE --strategy={mmap,serial,sendfile,aio,default}\r\n", argv[0]);
  return 0;
 }
 if (argc == 4 && strncmp(argv[3], "--strategy=mmap", 15) == 0)
  strategy = STRATEGY_MMAP_BOTH;
 else if (argc == 4 && strncmp(argv[3], "--strategy=serial", 17) == 0)
  strategy = STRATEGY_SERIAL;
 else if (argc == 4 && strncmp(argv[3], "--strategy=sendfile", 19) == 0)
  strategy = STRATEGY_SENDFILE;
 else if (argc == 4 && strncmp(argv[3], "--strategy=aio", 14) == 0)
  strategy = STRATEGY_AIO;
    
 open_files(argv);
 if (strategy == STRATEGY_SERIAL)
  serial_copy();
 else if (strategy == STRATEGY_SENDFILE)
  sendfile_copy();
 else if (strategy == STRATEGY_AIO)
  aio_copy();
 else
  mmap_copy(argv);
    
 SYSOUTF("\r\n[INFO] Success. Strategy was %s\r\n", STRATEGY_DESC[strategy]);
 PRINT_WALL_TIME();
 CLEANUP_EXIT(0);
 return 0;
}

0 komentarze: