#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <malloc.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <string.h>
#include <dlfcn.h>
#include <execinfo.h>
#include <pthread.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <errno.h>
#include <time.h>
#include <sys/syscall.h>
#include <inttypes.h>
#ifdef AIO_VIA_SYSCALL
#include <linux/aio_abi.h>
#else
#include <libaio.h>
#endif
#include <list>
#define SYSOUTF(msg, params...) do { char buff[1024]; memset(buff, 0, 1024); snprintf(buff, 1024, msg, params); write(1,buff,strlen(buff)); } while(0)
#define SYSOUT(msg) write(1, msg, strlen(msg))
#define ONE_MB (1024*1024)
#define ONE_GB (1024L*ONE_MB)
#define STRATEGY_MMAP_WRITE 0
#define STRATEGY_MMAP_BOTH 1
#define STRATEGY_SERIAL 2
#define STRATEGY_SENDFILE 3
#define STRATEGY_AIO 4
static const char* STRATEGY_DESC[] = { "MMAP+WRITE", "MMAPx2+MEMCPY", "READ+WRITE", "SENDFILE", "AIO" };
static struct timespec __ts, __ts_res;
static clockid_t __clock = CLOCK_REALTIME;
static int strategy = STRATEGY_MMAP_WRITE;
static int fd_in = -1, fd_out = -1, err_in = 0, err_out = 0;
static void *addr = NULL, *addr2 = NULL;
static long size_in = 0, blk_size = 0;
static int warn_mem_params_not_optimal = 1;
#define TIMER_START() do { timer_start(&__ts,__clock); } while(0)
#define TIMER_STOP() do { timer_stop(&__ts,__clock); clock_getres(__clock,&__ts_res); } while(0)
#define PRINT_WALL_TIME() do { long msec = __ts.tv_nsec/1000000L; SYSOUTF("[INFO] WALL TIME: %ldsec %ldmsec %ldusec +- %ldnsec\n",(long)__ts.tv_sec,msec,(__ts.tv_nsec - msec*1000000L)/1000L,__ts_res.tv_nsec); } while(0)
static inline int timer_start(struct timespec *ts, clockid_t clid) {
int res = clock_gettime(clid,ts);
if (res!=0) {
int e = errno;
SYSOUTF("Failed timer_start due to '%s'\r\n", strerror(e));
}
return res;
}
static inline int timer_stop(struct timespec *ts, clockid_t clid) {
struct timespec ts1;
int res = clock_gettime(clid,&ts1);
if (res!=0) {
int e = errno;
SYSOUTF("Failed timer_stop due to '%s'\r\n", strerror(e));
return res;
}
ts->tv_sec = ts1.tv_sec - ts->tv_sec;
ts->tv_nsec = ts1.tv_nsec - ts->tv_nsec;
if (ts->tv_nsec < 0) {
ts->tv_nsec += 1000000000L;
ts->tv_sec--;
}
return 0;
}
long get_rmem_limit() {
struct rlimit rlimit;
int res = getrlimit(RLIMIT_MEMLOCK, &rlimit);
if (res == 0) {
return (long)rlimit.rlim_max;
}
return -1L;
}
int open_source_file(char *path) {
int res = -1;
int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0;
do {
res = open(path, O_LARGEFILE | O_RDONLY | extra_flags);
}
while (res == -1 && errno == EINTR);
return res;
}
int open_dest_file(char *path) {
int res = -1;
int extra_flags = strategy == STRATEGY_AIO ? O_DIRECT : 0;
do {
res = open(path, O_CREAT | O_LARGEFILE | O_RDWR | extra_flags, 0666);
}
while (res == -1 && errno == EINTR);
return res;
}
long get_file_size(int fd, long *block_size) {
struct stat stat;
TIMER_START();
int res = fstat(fd, &stat);
if (res == 0) {
if (block_size != NULL)
*block_size = stat.st_blksize;
return stat.st_size;
}
return -1L;
}
int allocate_file(int fd, long size) {
return fallocate(fd, 0, 0, size);
}
void preload_file_advise(int fd, long size, off_t base_offset) {
int res = posix_fadvise(fd, base_offset, size, POSIX_FADV_WILLNEED | POSIX_FADV_SEQUENTIAL);
if (res != 0) {
int e = errno;
SYSOUTF("[WARN] POSIX fadvise returned error '%s'\r\n", strerror(e));
}
}
void* mmap_file(int fd, long size, int mode, off_t base_offset = 0) {
preload_file_advise(fd, size, base_offset);
int PERF_FLAGS[] = { MAP_HUGETLB | MAP_LOCKED, MAP_HUGETLB, MAP_LOCKED, 0 };
void *addr = NULL;
int i = 0;
for (; i < 4; i++) {
addr = mmap(NULL, size, mode == 0 ? PROT_READ : PROT_WRITE, MAP_SHARED | MAP_POPULATE | PERF_FLAGS[i], fd, base_offset);
if (addr!= (void*)-1)
break;
}
if (addr == (void*)-1)
return NULL;
if (i!=0 && warn_mem_params_not_optimal) {
SYSOUT("[WARN] Memory access is not optimal (no huge TLB & pages locked into RAM)\r\n");
warn_mem_params_not_optimal = 0;
}
int res = posix_madvise(addr, size, POSIX_MADV_WILLNEED | POSIX_MADV_SEQUENTIAL);
if (res != 0) {
int e = errno;
SYSOUTF("[WARN] POSIX madvise returned error '%s'\r\n", strerror(e));
}
res = mlock(addr, size);
if (res != 0) {
int e = errno;
SYSOUTF("[WARN] POSIX mlock returned error '%s'\r\n", strerror(e));
}
return addr;
}
int write_file(int fd, void *buff, long len, long base_off = 0) {
long to_be_written = len;
long off = 0;
do {
long res = pwrite(fd, buff, to_be_written, off + base_off);
if (res > 0) {
to_be_written -= res;
off += res;
if (to_be_written == 0L)
break;
}
else if (res < 0 && errno != EINTR)
break;
}
while (errno == EINTR);
TIMER_STOP();
if (errno == 0)
fdatasync(fd);
return to_be_written == 0 ? 0 : errno;
}
#define CLEANUP_EXIT(code) { if (addr) munmap(addr, size_in); if (addr2) munmap(addr2, size_in); if (fd_in!=-1) close(fd_in); if (fd_out!=-1) close(fd_out); exit(code); } while(0)
#define CLEANUP_MMAP(size) { if (addr) { munmap(addr, size); addr = NULL; } if (addr2) { munmap(addr2, size); addr2 = NULL; } } while(0)
void open_files(char **argv) {
long mlock_max = get_rmem_limit();
SYSOUTF("[INFO] RLIMIT_MEMLOCK is %ldKB,\r\n", mlock_max/1024);
if (mlock_max < 128*1024*1024) {
SYSOUTF("[WARN] Please adjust memlock in /etc/security/limits.conf to something big like %d (KB), which is 32GB.\r\n", 32*ONE_MB);
}
fd_in = open_source_file(argv[1]); err_in = errno;
if (fd_in == -1) {
SYSOUTF("[ERROR] Cannot open source file %s due to error '%s'\r\n", argv[1], strerror(err_in));
CLEANUP_EXIT(-1);
}
fd_out = open_dest_file(argv[2]); err_out = errno;
if (fd_out == -1) {
SYSOUTF("[ERROR] Cannot open destination file %s due to error '%s'\r\n", argv[2], strerror(err_out));
CLEANUP_EXIT(-2);
}
size_in = get_file_size(fd_in, &blk_size); err_in = errno;
if (size_in == -1L) {
SYSOUTF("[ERROR] Unable determine file size for %s due to error '%s'\r\n", argv[1], strerror(err_in));
CLEANUP_EXIT(-3);
}
SYSOUTF("[INFO] File size for %s is %ld, block size is %ld.\r\n", argv[1], size_in, blk_size);
if (allocate_file(fd_out, size_in) != 0) {
err_out = errno;
SYSOUTF("[ERROR] Cannot allocate file %s due to error '%s'\r\n", argv[2], strerror(err_out));
CLEANUP_EXIT(-4);
}
}
#define MMAP_IN 0
#define MMAP_OUT 1
#ifndef MMAP_CHUNK_SIZE
#define MMAP_CHUNK_SIZE (128*1024*1024)
#endif
void mmap_copy(char **argv) {
for (long base = 0; base < size_in; base += MMAP_CHUNK_SIZE) {
long size = size_in - base;
if (size > MMAP_CHUNK_SIZE)
size = MMAP_CHUNK_SIZE;
else if (size <= 0)
break;
addr = mmap_file(fd_in, size, MMAP_IN, base);
if (addr == NULL) {
err_in = errno;
SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[1], strerror(err_in));
CLEANUP_EXIT(-5);
}
if (strategy == STRATEGY_MMAP_WRITE) {
int res = write_file(fd_out, addr, size, base);
if (res!=0) {
SYSOUTF("[ERROR] Could not write file %s due to error '%s'\r\n", argv[2], strerror(res));
CLEANUP_EXIT(-6);
}
}
else if (strategy == STRATEGY_MMAP_BOTH) {
addr2 = mmap_file(fd_out, size, MMAP_OUT, base);
if (addr2 == NULL) {
err_out = errno;
SYSOUTF("[ERROR] Cannot load file %s into memory due to error '%s'\r\n", argv[2], strerror(err_out));
CLEANUP_EXIT(-7);
}
else {
memcpy(addr2, addr, size);
TIMER_STOP();
if (msync(addr2, size, MS_SYNC) != 0) {
err_out = errno;
SYSOUTF("[ERROR] Cannot sync file %s due to error '%s'\r\n", argv[2], strerror(err_out));
CLEANUP_EXIT(-8);
}
}
}
CLEANUP_MMAP(size);
}
TIMER_STOP();
}
int sendfile_copy() {
err_out = 0;
long to_write = size_in;
long written_once = 0;
while (to_write > 0) {
written_once = sendfile(fd_out, fd_in, NULL, to_write);
if (written_once == -1) {
err_out = errno;
if (err_out != EAGAIN) {
SYSOUTF("[ERROR] Function sendfile is not supported '%s'\r\n", strerror(err_out));
return -1;
}
else
continue;
}
else {
to_write -= written_once;
}
}
if (to_write != 0) {
SYSOUTF("[ERROR] Function sendfile written too small data: %ld vs %ld, error=%s\r\n", size_in - to_write, size_in, strerror(err_out));
return -1;
}
TIMER_STOP();
return 1;
}
#ifndef COPY_BLOCK_SIZE
#define COPY_BLOCK_SIZE 4096
#endif
void serial_copy() {
char buff[COPY_BLOCK_SIZE];
long in = 0;
long out = 0;
long cnt = 0;
preload_file_advise(fd_in, size_in, 0);
do {
in=read(fd_in, (void*)buff, COPY_BLOCK_SIZE);
if (in>0) {
out = in;
do {
cnt = write(fd_out, buff, out);
err_out = errno;
if (cnt < 0 && err_out != EINTR) {
SYSOUTF("[ERROR] Broken writing to output file '%s'\r\n", strerror(err_out));
CLEANUP_EXIT(-10);
}
else if (cnt > 0) {
out -= cnt;
}
}
while (out > 0);
}
else {
if (errno == 0) {
if (in==0)
continue;
else
break;
}
if (errno != EINTR) {
err_in = errno;
SYSOUTF("[ERROR] Broken reading of input file '%s'\r\n", strerror(err_in));
CLEANUP_EXIT(-9);
}
else
continue;
}
}
while (in > 0);
TIMER_STOP();
}
#define PG_SIZE 4096
#define AIO_MAX_NR 1024
#ifdef AIO_VIA_SYSCALL
inline int io_setup(unsigned nr, aio_context_t *ctxp) {
return syscall(__NR_io_setup, nr, ctxp);
}
inline int io_destroy(aio_context_t ctx) {
return syscall(__NR_io_destroy, ctx);
}
inline int io_submit(aio_context_t ctx, long nr, struct iocb **cbp) {
return syscall(__NR_io_submit, nr, cbp);
}
inline int io_cancel(aio_context_t ctx, struct iocb *cb, struct io_event *result) {
return syscall(__NR_io_cancel, cb, result);
}
inline int io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events, struct timespec *timeout) {
return syscall(__NR_io_getevents, min_nr, nr, events, timeout);
}
inline void io_prep_pread(struct iocb *cb, int fd, void *buff, long size, long offset) {
cb->aio_fildes = fd;
cb->aio_lio_opcode = IOCB_CMD_PREAD;
cb->aio_buf = (uint64_t)buff;
cb->aio_offset = offset;
cb->aio_nbytes = size;
}
#else
#define IOCB_CMD_PREAD 0
#define IOCB_CMD_PWRITE 1
#define IOCB_CMD_FSYNC 2
#define IOCB_CMD_FDSYNC 3
#define IOCB_CMD_NOOP 6
#define aio_context_t io_context_t
#endif
void aio_copy() {
int chunks = (size_in + (PG_SIZE/2) - 1) / PG_SIZE; /* with padding */
SYSOUTF("[INFO] AIO copy needs %d chunks\r\n", chunks);
aio_context_t ctx = 0;
struct iocb* cbs = (struct iocb*)malloc(AIO_MAX_NR * sizeof(struct iocb));
struct iocb** cbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*));
struct iocb** wcbps = (struct iocb**)malloc(AIO_MAX_NR * sizeof(struct iocb*));
struct io_event* ioevents = (struct io_event*)malloc(AIO_MAX_NR * sizeof(struct io_event));
if (cbs==NULL || cbps==NULL || wcbps==NULL || ioevents==NULL) {
int ret = errno;
SYSOUTF("[ERROR] Broken malloc '%s'\r\n", strerror(ret));
CLEANUP_EXIT(-12);
}
int ret = io_setup(AIO_MAX_NR, &ctx);
if (ret < 0) {
ret = errno;
SYSOUTF("[ERROR] Broken io_setup for %d chunks '%s'\r\n", chunks, strerror(ret));
CLEANUP_EXIT(-11);
}
void *bigbuff = malloc(AIO_MAX_NR*PG_SIZE);
#define DELETE_MEM do { if (bigbuff!=NULL) free(bigbuff); free(cbs); free(cbps); free(wcbps); free(ioevents); io_destroy(ctx); } while(0)
if (bigbuff == NULL) {
ret = errno;
SYSOUTF("[ERROR] Buffer allocation failed for input file: '%s'\r\n", strerror(ret));
DELETE_MEM;
CLEANUP_EXIT(-12);
}
int rounds = (chunks + AIO_MAX_NR/2 - 1) / AIO_MAX_NR; /* with padding */
#define AIO_GROUP_TO_OFFSET(i) (long(i) * PG_SIZE * AIO_MAX_NR)
#define AIO_STEP_TO_OFFSET(i) (long(i) * PG_SIZE)
for (int i=0; i < AIO_MAX_NR; i++) {
memset(&cbs[i], 0, sizeof(struct iocb));
cbps[i] = &cbs[i];
}
for (int group=0; group < rounds; group++) {
int step = 0;
for (; step < AIO_MAX_NR; step++) {
long curr_off = AIO_GROUP_TO_OFFSET(group) + AIO_STEP_TO_OFFSET(step);
long nbytes = size_in - curr_off;
if (nbytes <= 0) /* we are inside padding */
break;
if (nbytes > PG_SIZE)
nbytes = PG_SIZE;
long pbuff = (long)bigbuff + AIO_STEP_TO_OFFSET(step);
io_prep_pread(&cbs[step], fd_in, (void*)pbuff, nbytes, curr_off);
}
if (step == 0)
break;
int todo = io_submit(ctx, step, cbps);
if (todo != step) {
ret = errno;
SYSOUTF("[ERROR] Broken io_submit, group=%d, queued=%d; '%s'\r\n", group, todo >= 0 ? todo : -1, todo < 0 ? strerror(ret) : "failure for given index");
DELETE_MEM;
CLEANUP_EXIT(-13);
}
int wi = 0;
while (todo > 0) {
ret = io_getevents(ctx, 1, todo, ioevents, NULL);
if (ret < 0) {
ret = errno;
SYSOUTF("[ERROR] Broken io_getevents '%s'\r\n", strerror(ret));
DELETE_MEM;
CLEANUP_EXIT(-14);
}
else if (ret > 0) {
todo -= ret;
int to_write = 0;
for (int i=0; i < ret; i++) {
struct iocb *iocbp = (struct iocb*) ioevents[i].obj;
if (iocbp->aio_lio_opcode == IOCB_CMD_PREAD) {
iocbp->aio_fildes = fd_out;
iocbp->aio_lio_opcode = IOCB_CMD_PWRITE;
wcbps[wi++] = iocbp;
to_write++;
}
}
if (to_write > 0) {
int e = io_submit(ctx, to_write, &wcbps[wi - to_write]); /* move along linearly */
if (e != to_write) {
ret = errno;
SYSOUTF("[ERROR] Broken io_submit for write '%s'\r\n", e < 0 ? strerror(ret) : "failure for given index");
DELETE_MEM;
CLEANUP_EXIT(-15);
}
else
todo += to_write;
}
}
}
}
DELETE_MEM;
TIMER_STOP();
}
int main(int argc, char **argv) {
if (argc < 3) {
SYSOUTF("Usage: %s SRC_FILE DST_FILE --strategy={mmap,serial,sendfile,aio,default}\r\n", argv[0]);
return 0;
}
if (argc == 4 && strncmp(argv[3], "--strategy=mmap", 15) == 0)
strategy = STRATEGY_MMAP_BOTH;
else if (argc == 4 && strncmp(argv[3], "--strategy=serial", 17) == 0)
strategy = STRATEGY_SERIAL;
else if (argc == 4 && strncmp(argv[3], "--strategy=sendfile", 19) == 0)
strategy = STRATEGY_SENDFILE;
else if (argc == 4 && strncmp(argv[3], "--strategy=aio", 14) == 0)
strategy = STRATEGY_AIO;
open_files(argv);
if (strategy == STRATEGY_SERIAL)
serial_copy();
else if (strategy == STRATEGY_SENDFILE)
sendfile_copy();
else if (strategy == STRATEGY_AIO)
aio_copy();
else
mmap_copy(argv);
SYSOUTF("\r\n[INFO] Success. Strategy was %s\r\n", STRATEGY_DESC[strategy]);
PRINT_WALL_TIME();
CLEANUP_EXIT(0);
return 0;
}
czwartek, grudnia 18, 2014
How to copy EMS datastore for interDC transport? Ought to be _fast_.
Subskrybuj:
Komentarze do posta (Atom)
0 komentarze:
Prześlij komentarz