#ifndef _GNU_SOURCE #define _GNU_SOURCE #endif #include <stdio.h> #include <stdlib.h> #include <stdarg.h> #include <malloc.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <string.h> #include <dlfcn.h> #include <execinfo.h> #include <pthread.h> #include <sys/time.h> #include <sys/resource.h> #include <sys/mman.h> #include <sys/sendfile.h> #include <errno.h> #include <time.h> #include <sys/syscall.h> #include <inttypes.h> #ifdef AIO_VIA_SYSCALL #include <linux/aio_abi.h> #else #include <libaio.h> #endif #include <list> #define SYSOUTF(msg, params...) do { char buff[1024]; memset(buff, 0, 1024); snprintf(buff, 1024, msg, params); write(1,buff,strlen(buff)); } while(0) #define SYSOUT(msg) write(1, msg, strlen(msg)) #define ONE_MB (1024*1024) #define ONE_GB (1024L*ONE_MB) #define STRATEGY_MMAP_WRITE 0 #define STRATEGY_MMAP_BOTH 1 #define STRATEGY_SERIAL 2 #define STRATEGY_SENDFILE 3 #define STRATEGY_AIO 4 static const char* STRATEGY_DESC[] = { "MMAP+WRITE", "MMAPx2+MEMCPY", "READ+WRITE", "SENDFILE", "AIO" }; static struct timespec __ts, __ts_res; static clockid_t __clock = CLOCK_REALTIME; static int strategy = STRATEGY_MMAP_WRITE; static int fd_in = -1, fd_out = -1, err_in = 0, err_out = 0; static void *addr = NULL, *addr2 = NULL; static long size_in = 0, blk_size = 0; static int warn_mem_params_not_optimal = 1; #define TIMER_START() do { timer_start(&__ts,__clock); } while(0) #define TIMER_STOP() do { timer_stop(&__ts,__clock); clock_getres(__clock,&__ts_res); } while(0) #define PRINT_WALL_TIME() do { long msec = __ts.tv_nsec/1000000L; SYSOUTF("[INFO] WALL TIME: %ldsec %ldmsec %ldusec +- %ldnsec\n",(long)__ts.tv_sec,msec,(__ts.tv_nsec - msec*1000000L)/1000L,__ts_res.tv_nsec); } while(0) static inline int timer_start(struct timespec *ts, clockid_t clid) { int res = clock_gettime(clid,ts); if (res!=0) { int e = errno; SYSOUTF("Failed timer_start due to '%s'\r\n", strerror(e)); } return res; } static inline int timer_stop(struct timespec *ts, clockid_t clid) { struct timespec ts1; int res = clock_gettime(clid,&ts1); if (res!=0) { int e = errno; SYSOUTF("Failed timer_stop due to '%s'\r\n", strerror(e)); return res; } ts->tv_sec = ts1.tv_sec - ts->tv_sec; ts->tv_nsec = ts1.tv_nsec - ts->tv_nsec; if (ts->tv_nsec < 0) { ts->tv_nsec += 1000000000L; ts->tv_sec--; } return 0; } long get_rmem_limit() { struct rlimit rlimit; int res = getrlimit(RLIMIT_MEMLOCK, &rlimit); if (res == 0) { return (long)rlimit.rlim_max; } return -1L; } int open_source_file(char *path) { int res = -1; int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0; do { res = open(path, O_LARGEFILE | O_RDONLY | extra_flags); } while (res == -1 && errno == EINTR); return res; } int open_dest_file(char *path) { int res = -1; int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0; do { res = open(path, O_CREAT | O_LARGEFILE | O_RDWR | extra_flags, 0666); } while (res == -1 && errno == EINTR); return res; } long get_file_size(int fd, long *block_size) { struct stat stat; TIMER_START(); int res = fstat(fd, &stat); if (res == 0) { if (block_size != NULL) *block_size = stat.st_blksize; return stat.st_size; } return -1L; } int allocate_file(int fd, long size) { return fallocate(fd, 0, 0, size); } void preload_file_advise(int fd, long size, off_t base_offset) { int res = posix_fadvise(fd, base_offset, size, POSIX_FADV_WILLNEED | POSIX_FADV_SEQUENTIAL); if (res != 0) { int e = errno; SYSOUTF("[WARN] POSIX fadvise returned error '%s'\r\n", strerror(e)); } } void* mmap_file(int fd, long size, int mode, off_t base_offset = 0) { preload_file_advise(fd, size, base_offset); int PERF_FLAGS[] = { MAP_HUGETLB | MAP_LOCKED, MAP_HUGETLB, MAP_LOCKED, 0 }; void *addr = NULL; int i = 0; for (; i < 4; i++) { addr = mmap(NULL, size, mode == 0 ? PROT_READ : PROT_WRITE, MAP_SHARED | MAP_POPULATE | PERF_FLAGS[i], fd, base_offset); if (addr!= (void*)-1) break; } if (addr == (void*)-1) return NULL; if (i!=0 && warn_mem_params_not_optimal) { SYSOUT("[WARN] Memory access is not optimal (no huge TLB & pages locked into RAM)\r\n"); warn_mem_params_not_optimal = 0; } int res = posix_madvise(addr, size, POSIX_MADV_WILLNEED | POSIX_MADV_SEQUENTIAL); if (res != 0) { int e = errno; SYSOUTF("[WARN] POSIX madvise returned error '%s'\r\n", strerror(e)); } res = mlock(addr, size); if (res != 0) { int e = errno; SYSOUTF("[WARN] POSIX mlock returned error '%s'\r\n", strerror(e)); } return addr; } int write_file(int fd, void *buff, long len, long base_off = 0) { long to_be_written = len; long off = 0; do { long res = pwrite(fd, buff, to_be_written, off + base_off); if (res > 0) { to_be_written -= res; off += res; if (to_be_written == 0L) break; } else if (res < 0 && errno != EINTR) break; } while (errno == EINTR); TIMER_STOP(); if (errno == 0) fdatasync(fd); return to_be_written == 0 ? 0 : errno; } #define CLEANUP_EXIT(code) { if (addr) munmap(addr, size_in); if (addr2) munmap(addr2, size_in); if (fd_in!=-1) close(fd_in); if (fd_out!=-1) close(fd_out); exit(code); } while(0) #define CLEANUP_MMAP(size) { if (addr) { munmap(addr, size); addr = NULL; } if (addr2) { munmap(addr2, size); addr2 = NULL; } } while(0) void open_files(char **argv) { long mlock_max = get_rmem_limit(); SYSOUTF("[INFO] RLIMIT_MEMLOCK is %ldKB,\r\n", mlock_max/1024); if (mlock_max < 128*1024*1024) { SYSOUTF("[WARN] Please adjust memlock in /etc/security/limits.conf to something big like %d (KB), which is 32GB.\r\n", 32*ONE_MB); } fd_in = open_source_file(argv[1]); err_in = errno; if (fd_in == -1) { SYSOUTF("[ERROR] Cannot open source file %s due to error '%s'\r\n", argv[1], strerror(err_in)); CLEANUP_EXIT(-1); } fd_out = open_dest_file(argv[2]); err_out = errno; if (fd_out == -1) { SYSOUTF("[ERROR] Cannot open destination file %s due to error '%s'\r\n", argv[2], strerror(err_out)); CLEANUP_EXIT(-2); } size_in = get_file_size(fd_in, &blk_size); err_in = errno; if (size_in == -1L) { SYSOUTF("[ERROR] Unable determine file size for %s due to error '%s'\r\n", argv[1], strerror(err_in)); CLEANUP_EXIT(-3); } SYSOUTF("[INFO] File size for %s is %ld, block size is %ld.\r\n", argv[1], size_in, blk_size); if (allocate_file(fd_out, size_in) != 0) { err_out = errno; SYSOUTF("[ERROR] Cannot allocate file %s due to error '%s'\r\n", argv[2], strerror(err_out)); CLEANUP_EXIT(-4); } } #define MMAP_IN 0 #define MMAP_OUT 1 #ifndef MMAP_CHUNK_SIZE #define MMAP_CHUNK_SIZE (128*1024*1024) #endif void mmap_copy(char **argv) { for (long base = 0; base < size_in; base += MMAP_CHUNK_SIZE) { long size = size_in - base; if (size > MMAP_CHUNK_SIZE) size = MMAP_CHUNK_SIZE; else if (size <= 0) break; addr = mmap_file(fd_in, size, MMAP_IN, base); if (addr == NULL) { err_in = errno; SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[1], strerror(err_in)); CLEANUP_EXIT(-5); } if (strategy == STRATEGY_MMAP_WRITE) { int res = write_file(fd_out, addr, size, base); if (res!=0) { SYSOUTF("[ERROR] Could not write file %s due to error '%s'\r\n", argv[2], strerror(res)); CLEANUP_EXIT(-6); } } else if (strategy == STRATEGY_MMAP_BOTH) { addr2 = mmap_file(fd_out, size, MMAP_OUT, base); if (addr2 == NULL) { err_out = errno; SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[2], strerror(err_out)); CLEANUP_EXIT(-7); } else { memcpy(addr2, addr, size); TIMER_STOP(); if (msync(addr2, size, MS_SYNC) != 0) { err_out = errno; SYSOUTF("[ERROR] Cannot sync file %s due to error '%s'\r\n", argv[2], strerror(err_out)); CLEANUP_EXIT(-8); } } } CLEANUP_MMAP(size); } TIMER_STOP(); } int sendfile_copy() { err_out = 0; long to_write = size_in; long written_once = 0; while (to_write > 0) { written_once = sendfile(fd_out, fd_in, NULL, to_write); if (written_once == -1) { err_out = errno; if (err_out != EAGAIN) { SYSOUTF("[ERROR] Function sendfile is not supported '%s'\r\n", strerror(err_out)); return -1; } else continue; } else { to_write -= written_once; } } if (to_write != 0) { SYSOUTF("[ERROR] Function sendfile written too small data: %ld vs %ld, error=%s\r\n", size_in - to_write, size_in, strerror(err_out)); return -1; } TIMER_STOP(); return 1; } #ifndef COPY_BLOCK_SIZE #define COPY_BLOCK_SIZE 4096 #endif void serial_copy() { char buff[COPY_BLOCK_SIZE]; long in = 0; long out = 0; long cnt = 0; preload_file_advise(fd_in, size_in, 0); do { in=read(fd_in, (void*)buff, COPY_BLOCK_SIZE); if (in>0) { out = in; do { cnt = write(fd_out, buff, out); err_out = errno; if (cnt < 0 && err_out != EINTR) { SYSOUTF("[ERROR] Broken writing to output file '%s'\r\n", strerror(err_out)); CLEANUP_EXIT(-10); } else if (cnt > 0) { out -= cnt; } } while (out > 0); } else { if (errno == 0) { if (in==0) continue; else break; } if (errno != EINTR) { err_in = errno; SYSOUTF("[ERROR] Broken reading of input file '%s'\r\n", strerror(err_in)); CLEANUP_EXIT(-9); } else continue; } } while (in > 0); TIMER_STOP(); } #define PG_SIZE 4096 #define AIO_MAX_NR 1024 #ifdef AIO_VIA_SYSCALL inline int io_setup(unsigned nr, aio_context_t *ctxp) { return syscall(__NR_io_setup, nr, ctxp); } inline int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } inline int io_submit(aio_context_t ctx, long nr, struct iocb **cbp) { return syscall(__NR_io_submit, nr, cbp); } inline int io_cancel(aio_context_t ctx, struct iocb *cb, struct io_event *result) { return syscall(__NR_io_cancel, cb, result); } inline int io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events, struct timespec *timeout) { return syscall(__NR_io_getevents, min_nr, nr, events, timeout); } inline void io_prep_pread(struct iocb *cb, int fd, void *buff, long size, long offset) { cb->aio_fildes = fd; cb->aio_lio_opcode = IOCB_CMD_PREAD; cb->aio_buf = (uint64_t)buff; cb->aio_offset = offset; cb->aio_nbytes = size; } #else #define IOCB_CMD_PREAD 0 #define IOCB_CMD_PWRITE 1 #define IOCB_CMD_FSYNC 2 #define IOCB_CMD_FDSYNC 3 #define IOCB_CMD_NOOP 6 #define aio_context_t io_context_t #endif void aio_copy() { int chunks = (size_in + (PG_SIZE/2) - 1) / PG_SIZE; /* with padding */ SYSOUTF("[INFO] AIO copy needs %d chunks\r\n", chunks); aio_context_t ctx = 0; struct iocb* cbs = (struct iocb*)malloc(AIO_MAX_NR * sizeof(struct iocb)); struct iocb** cbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*)); struct iocb** wcbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*)); struct io_event* ioevents = (struct io_event*)malloc(AIO_MAX_NR * sizeof(struct io_event)); if (cbs==NULL || cbps==NULL || wcbps==NULL || ioevents==NULL) { int ret = errno; SYSOUTF("[ERROR] Broken malloc '%s'\r\n", strerror(ret)); CLEANUP_EXIT(-12); } int ret = io_setup(AIO_MAX_NR, &ctx); if (ret < 0) { ret = errno; SYSOUTF("[ERROR] Broken io_setup for %d chunks '%s'\r\n", chunks, strerror(ret)); CLEANUP_EXIT(-11); } void *bigbuff = malloc(AIO_MAX_NR*PG_SIZE); #define DELETE_MEM do { if (bigbuff!=NULL) free(bigbuff); free(cbs); free(cbps); free(wcbps); free(ioevents); io_destroy(ctx); } while(0) if (bigbuff == NULL) { ret = errno; SYSOUTF("[ERROR] Buffer allocation failed for input file: '%s'\r\n", strerror(ret)); DELETE_MEM; CLEANUP_EXIT(-12); } int rounds = (chunks + AIO_MAX_NR/2 - 1) / AIO_MAX_NR; /* with padding */ #define AIO_GROUP_TO_OFFSET(i) (long(i) * PG_SIZE * AIO_MAX_NR) #define AIO_STEP_TO_OFFSET(i) (long(i) * PG_SIZE) for (int i=0; i < AIO_MAX_NR; i++) { memset(&cbs[i], 0, sizeof(struct iocb)); cbps[i] = &cbs[i]; } for (int group=0; group < rounds; group++) { int step = 0; for (; step < AIO_MAX_NR; step++) { long curr_off = AIO_GROUP_TO_OFFSET(group) + AIO_STEP_TO_OFFSET(step); long nbytes = size_in - curr_off; if (nbytes <= 0) /* we are inside padding */ break; if (nbytes > PG_SIZE) nbytes = PG_SIZE; long pbuff = (long)bigbuff + AIO_STEP_TO_OFFSET(step); io_prep_pread(&cbs[step], fd_in, (void*)pbuff, nbytes, curr_off); } if (step == 0) break; int todo = io_submit(ctx, step, cbps); if (todo != step) { ret = errno; SYSOUTF("[ERROR] Broken io_submit, group=%d, queued=%d; '%s'\r\n", group, todo >= 0 ? todo : -1, todo < 0 ? strerror(ret) : "failure for given index"); DELETE_MEM; CLEANUP_EXIT(-13); } int wi = 0; while (todo > 0) { ret = io_getevents(ctx, 1, todo, ioevents, NULL); if (ret < 0) { ret = errno; SYSOUTF("[ERROR] Broken io_getevents '%s'\r\n", strerror(ret)); DELETE_MEM; CLEANUP_EXIT(-14); } else if (ret > 0) { todo -= ret; int to_write = 0; for (int i=0; i < ret; i++) { struct iocb *iocbp = (struct iocb*) ioevents[i].obj; if (iocbp->aio_lio_opcode == IOCB_CMD_PREAD) { iocbp->aio_fildes = fd_out; iocbp->aio_lio_opcode = IOCB_CMD_PWRITE; wcbps[wi++] = iocbp; to_write++; } } if (to_write > 0) { int e = io_submit(ctx, to_write, &wcbps[wi - to_write]); /* move along linearly */ if (e != to_write) { ret = errno; SYSOUTF("[ERROR] Broken io_submit for write '%s'\r\n", e < 0 ? strerror(ret) : "failure for given index"); DELETE_MEM; CLEANUP_EXIT(-15); } else todo += to_write; } } } } DELETE_MEM; TIMER_STOP(); } int main(int argc, char **argv) { if (argc < 3) { SYSOUTF("Usage: %s SRC_FILE DST_FILE --strategy={mmap,serial,sendfile,aio,default}\r\n", argv[0]); return 0; } if (argc == 4 && strncmp(argv[3], "--strategy=mmap", 15) == 0) strategy = STRATEGY_MMAP_BOTH; else if (argc == 4 && strncmp(argv[3], "--strategy=serial", 17) == 0) strategy = STRATEGY_SERIAL; else if (argc == 4 && strncmp(argv[3], "--strategy=sendfile", 19) == 0) strategy = STRATEGY_SENDFILE; else if (argc == 4 && strncmp(argv[3], "--strategy=aio", 14) == 0) strategy = STRATEGY_AIO; open_files(argv); if (strategy == STRATEGY_SERIAL) serial_copy(); else if (strategy == STRATEGY_SENDFILE) sendfile_copy(); else if (strategy == STRATEGY_AIO) aio_copy(); else mmap_copy(argv); SYSOUTF("\r\n[INFO] Success. Strategy was %s\r\n", STRATEGY_DESC[strategy]); PRINT_WALL_TIME(); CLEANUP_EXIT(0); return 0; }
czwartek, grudnia 18, 2014
How to copy EMS datastore for interDC transport? Ought to be _fast_.
Subskrybuj:
Komentarze do posta (Atom)
0 komentarze:
Prześlij komentarz