*** a/numad.c 2012-02-21 08:38:29.752464434 -0500 --- b/numad.c 2012-02-21 09:44:42.439609291 -0500 *************** *** 20,27 **** ! // compile with: gcc -g -std=gnu99 -o numad numad.c ! // (will need to add pthreads in subsequent versions) --- 20,82 ---- ! // Compile with: gcc -O -std=gnu99 -pthread -o numad numad.c ! ! ! ! ! #if 0 ! /* ! ! TODO: ! ===== ! ! - extend dynamic intervals? ! ! - hi / lo watermarks, and ring buffer for processor cpu_used ! ! - Write doc intro, explaining the main purposes and the design ! - Set up a public git repo so others can access ! ! - Verify threshold and limit settings. Make sure all tunables are tunable.... ! - Verify completely stateless design: numad can stop start stop start at will -- and still improve things -- no history required. ! - Need to factor HTT out of CPUs -- want whole cores ! - Fix current hacky multinode allocations ! ! - Add weighted tally field to node struct that sums up resorces + node[i].dist * node[i].resources. ! Use this to order the available resource nodes? ! Should communicate total cost of resources starting from that node.... ! Might need additional that includes actual absolute unscaled totals ! ! - command line -p option for immed attention short cut to short list. Need internal "force" flag? ! - cmd line option for alternate cpuset directory ! ! - consider interleaved memory when multinode? ! ! - Do we need followup balancer pass to examine and improve balance on a per node basis? ! ! - do we need to maintain summary info about what is already bound to each node? ! ! - verify scenarios requiring all nodes... ! ! - verify scenarios that might induce oscillation ! ! - verify scenarios on oversubscribed machines. ! ! - make more thread safe. eliminate some static variables where needed. Fix mutex usage. ! ! - fs and fgets checks in same conditional ! ! - initting points in subroutine ! ! - verify option arguements and require to fit in bounds -- e.g. percent must be 0-100, etc. ! ! - add a thread that slowly cleans up obsolete cpusets ! ! ! */ ! #endif ! *************** *** 42,49 **** --- 98,110 ---- #include #include #include + #include #include + #include + #include + #include #include + #include #include #include #include *************** *** 52,65 **** ! #define VERSION_STRING "20120214" ! ! #if ((FEDORA) || (RHEL7)) ! # define CPUSET_DIR "/sys/fs/cgroup/cpuset" ! #else ! # define CPUSET_DIR "/cgroup/cpuset" ! #endif #define QEMU_CPUSET_DIR "/cgroup/cpuset/libvirt/qemu" --- 113,124 ---- ! #define VERSION_STRING "20120220" ! char cpuset_dir_new[] = "/sys/fs/cgroup/cpuset"; ! char cpuset_dir_old[] = "/cgroup/cpuset"; ! /* default to new */ ! char *cpuset_dir = cpuset_dir_new; #define QEMU_CPUSET_DIR "/cgroup/cpuset/libvirt/qemu" *************** *** 84,113 **** #define CPU_THRESHOLD 30 #define MEMORY_THRESHOLD 300 ! #define MIN_INTERVAL 2 ! #define MAX_INTERVAL 10 ! #define TARGET_UTILIZATION_PERCENT 90 ! #define ELIM_NEW_LINE(s) if (s[strlen(s) - 1] == '\n') s[strlen(s) - 1] = '\0' - int debug = 0; - int quiet = 0; - int verbose = 0; int num_cpus = 0; int num_nodes = 0; int cur_cpu_data_buf = 0; int page_size_in_bytes = 0; int huge_page_size_in_bytes = 0; int max_interval = MAX_INTERVAL; int target_utilization = TARGET_UTILIZATION_PERCENT; ! int req_mbs = 0; ! int req_cpus = 0; FILE *log_fs = NULL; --- 143,317 ---- #define CPU_THRESHOLD 30 #define MEMORY_THRESHOLD 300 ! #define MIN_INTERVAL 5 ! #define MAX_INTERVAL 15 ! #define TARGET_UTILIZATION_PERCENT 85 ! + #define ELIM_NEW_LINE(s) \ + if (s[strlen(s) - 1] == '\n') { \ + s[strlen(s) - 1] = '\0'; \ + } ! #define CONVERT_DIGITS_TO_NUM(p, n) \ ! n = *p++ - '0'; \ ! while (isdigit(*p)) { \ ! n *= 10; \ ! n += (*p++ - '0'); \ ! } int num_cpus = 0; int num_nodes = 0; int cur_cpu_data_buf = 0; int page_size_in_bytes = 0; int huge_page_size_in_bytes = 0; + int min_interval = MIN_INTERVAL; int max_interval = MAX_INTERVAL; int target_utilization = TARGET_UTILIZATION_PERCENT; ! pthread_mutex_t mutex; ! int requested_mbs = 0; ! int requested_cpus = 0; FILE *log_fs = NULL; + int log_level = LOG_NOTICE; + + void numad_log(int level, const char *fmt, ...) { + if (level > log_level) { + return; + // Logging levels (from sys/syslog.h) + // #define LOG_EMERG 0 /* system is unusable */ + // #define LOG_ALERT 1 /* action must be taken immediately */ + // #define LOG_CRIT 2 /* critical conditions */ + // #define LOG_ERR 3 /* error conditions */ + // #define LOG_WARNING 4 /* warning conditions */ + // #define LOG_NOTICE 5 /* normal but significant condition */ + // #define LOG_INFO 6 /* informational */ + // #define LOG_DEBUG 7 /* debug-level messages */ + } + char buf[BUF_SIZE]; + time_t ts = time(NULL); + sprintf(buf, ctime(&ts)); + char *p = &buf[strlen(buf) - 1]; + *p++ = ':'; + *p++ = ' '; + va_list ap; + va_start(ap, fmt); + vsnprintf(p, BUF_SIZE, fmt, ap); + va_end(ap); + fprintf(log_fs, "%s", buf); + fflush(log_fs); + } + + void open_log_file() { + log_fs = fopen(VAR_LOG_FILE, "a"); + if (log_fs == NULL) { + log_fs = stderr; + numad_log(LOG_ERR, "Cannot open numad log file -- using stderr\n"); + } + } + + void close_log_file() { + if (log_fs != NULL) { + fclose(log_fs); + log_fs = NULL; + } + } + + + + #define MSG_BODY_TEXT_SIZE 48 + + typedef struct msg_body { + long src_pid; + long cmd; + long arg1; + long arg2; + char text[MSG_BODY_TEXT_SIZE]; + } msg_body_t, *msg_body_p; + + typedef struct msg { + long dst_pid; // msg mtype is dest PID + msg_body_t body; + } msg_t, *msg_p; + + int msg_qid; + + void flush_msg_queue() { + msg_t msg; + do { + msgrcv(msg_qid, &msg, sizeof(msg_body_t), 0, IPC_NOWAIT); + } while (errno != ENOMSG); + } + + void init_msg_queue() { + key_t msg_key = 0xdeadbeef; + int msg_flg = 0660 | IPC_CREAT; + msg_qid = msgget(msg_key, msg_flg); + if (msg_qid < 0) { + numad_log(LOG_CRIT, "msgget failed\n"); + exit(EXIT_FAILURE); + } + flush_msg_queue(); + } + + void recv_msg(msg_p m) { + if (msgrcv(msg_qid, m, sizeof(msg_body_t), getpid(), 0) < 0) { + numad_log(LOG_CRIT, "msgrcv failed\n"); + exit(EXIT_FAILURE); + } + // printf("Recieved: >>%s<< from process %d\n", m->body.text, m->body.src_pid); + } + + void send_msg(long dst_pid, long cmd, long arg1, long arg2, char *s) { + msg_t msg; + msg.dst_pid = dst_pid; + msg.body.src_pid = getpid(); + msg.body.cmd = cmd; + msg.body.arg1 = arg1; + msg.body.arg2 = arg2; + strcpy(msg.body.text, s); + size_t len = sizeof(msg_body_t) - MSG_BODY_TEXT_SIZE + strlen(msg.body.text) + 1; + if (msgsnd(msg_qid, &msg, len, IPC_NOWAIT) < 0) { + numad_log(LOG_CRIT, "msgsnd failed\n"); + exit(EXIT_FAILURE); + } + // printf("Sent: >>%s<< to process %d\n", msg.body.text, msg.dst_pid); + } + + + + void shut_down_numad() { + numad_log(LOG_NOTICE, "Shutting down numad\n"); + flush_msg_queue(); + unlink(VAR_RUN_FILE); + close_log_file(); + exit(EXIT_SUCCESS); + } + + + void print_version_and_exit(char *prog_name) { + fprintf(stdout, "%s version: %s: compiled %s\n", prog_name, VERSION_STRING, __DATE__); + exit(EXIT_SUCCESS); + } + + + void print_usage_and_exit(char *prog_name) { + fprintf(stderr, "Usage: %s ...\n", prog_name); + fprintf(stderr, "-h to print this usage info\n"); + fprintf(stderr, "-i [:] to specify interval seconds\n"); + fprintf(stderr, "-l to specify logging level (usually 5, 6, or 7)\n"); + fprintf(stderr, "-u to specify target utilization percent\n"); + fprintf(stderr, "-v for verbose (same effect as '-l 6'\n"); + fprintf(stderr, "-V to show version info\n"); + fprintf(stderr, "-w [:] for node suggestions\n"); + exit(EXIT_FAILURE); + } + + + *************** *** 121,129 **** #define INIT_ID_LIST(list_p) \ list_p = malloc(sizeof(id_list_t)); \ ! if (list_p == NULL) { perror("malloc"); exit(EXIT_FAILURE); } \ list_p->set_p = CPU_ALLOC(num_cpus); \ ! if (list_p->set_p == NULL) { perror("CPU_ALLOC"); exit(EXIT_FAILURE); } \ list_p->bytes = CPU_ALLOC_SIZE(num_cpus); #define CLEAR_LIST(list_p) \ --- 325,333 ---- #define INIT_ID_LIST(list_p) \ list_p = malloc(sizeof(id_list_t)); \ ! if (list_p == NULL) { numad_log(LOG_CRIT, "INIT_ID_LIST malloc failed\n"); exit(EXIT_FAILURE); } \ list_p->set_p = CPU_ALLOC(num_cpus); \ ! if (list_p->set_p == NULL) { numad_log(LOG_CRIT, "CPU_ALLOC failed\n"); exit(EXIT_FAILURE); } \ list_p->bytes = CPU_ALLOC_SIZE(num_cpus); #define CLEAR_LIST(list_p) \ *************** *** 151,157 **** int add_ids_to_list_from_str(id_list_p list_p, char *s) { if (list_p == NULL) { ! perror("Cannot add to NULL list"); exit(EXIT_FAILURE); } if ((s == NULL) || (strlen(s) == 0)) { --- 355,361 ---- int add_ids_to_list_from_str(id_list_p list_p, char *s) { if (list_p == NULL) { ! numad_log(LOG_CRIT, "Cannot add to NULL list\n"); exit(EXIT_FAILURE); } if ((s == NULL) || (strlen(s) == 0)) { *************** *** 169,180 **** in_range = 1; } } ! // convert consecutive digits to an ID ! int id = *s++ - '0'; ! while (isdigit(*s)) { ! id *= 10; ! id += (*s++ - '0'); ! } if (!in_range) { next_id = id; } --- 373,380 ---- in_range = 1; } } ! int id; ! CONVERT_DIGITS_TO_NUM(s, id); if (!in_range) { next_id = id; } *************** *** 190,196 **** int str_from_id_list(char *str_p, int str_size, id_list_p list_p) { char *p = str_p; if ((p == NULL) || (str_size < 3)) { ! perror("Bad string for ID listing"); exit(EXIT_FAILURE); } int n; --- 390,396 ---- int str_from_id_list(char *str_p, int str_size, id_list_p list_p) { char *p = str_p; if ((p == NULL) || (str_size < 3)) { ! numad_log(LOG_CRIT, "Bad string for ID listing\n"); exit(EXIT_FAILURE); } int n; *************** *** 317,345 **** typedef struct process_data { ! // Most process stats are derived from /proc//stat info int pid; ! int pgrp; ! char *comm_name; uint64_t utime; uint64_t stime; ! int64_t priority; ! int64_t nice; int64_t num_threads; ! uint64_t starttime; ! uint64_t vsize_MBs; uint64_t rss_MBs; int processor; ! unsigned rt_priority; ! unsigned policy; ! uint64_t guest_time; ! // Miscellaneous other per-process data uint64_t uptime; // Data timestamp from /proc/uptime uint64_t MBs_used; uint64_t CPUs_used; char *cpuset_name; int dup_bind_count; ! id_list_p node_list_p; } process_data_t, *process_data_p; #define PROCESS_HASH_TABLE_SIZE 2003 --- 520,548 ---- typedef struct process_data { ! // Most process stats are derived from /proc//stat info -- subset of ! // stat_data above. Currently use only about half of these... int pid; ! // int pgrp; ! char *comm; uint64_t utime; uint64_t stime; ! // int64_t priority; ! // int64_t nice; int64_t num_threads; ! // uint64_t starttime; ! // uint64_t vsize_MBs; uint64_t rss_MBs; int processor; ! // unsigned rt_priority; ! // unsigned policy; ! // uint64_t guest_time; uint64_t uptime; // Data timestamp from /proc/uptime uint64_t MBs_used; uint64_t CPUs_used; char *cpuset_name; int dup_bind_count; ! // id_list_p node_list_p; } process_data_t, *process_data_p; #define PROCESS_HASH_TABLE_SIZE 2003 *************** *** 385,391 **** } process_hash_collisions += 1; if (ix == starting_ix) { ! perror("Process hash table is full"); // FIXME: do something here, or preferrably much sooner, // perhaps when some collisions threshold is passed return -1; --- 588,594 ---- } process_hash_collisions += 1; if (ix == starting_ix) { ! numad_log(LOG_ERR, "Process hash table is full\n"); // FIXME: do something here, or preferrably much sooner, // perhaps when some collisions threshold is passed return -1; *************** *** 407,432 **** uint64_t time_diff = newp->uptime - p->uptime; p->CPUs_used = 100 * (utime_diff + stime_diff) / time_diff; } ! if ((!p->comm_name) || (strcmp(p->comm_name, newp->comm_name))) { ! if (p->comm_name) { ! free(p->comm_name); ! } ! p->comm_name = strdup(newp->comm_name); ! } ! p->pgrp = newp->pgrp; ! p->nice = newp->nice; ! p->policy = newp->policy; ! p->priority = newp->priority; p->processor = newp->processor; p->rss_MBs = newp->rss_MBs; ! p->vsize_MBs = newp->vsize_MBs; ! p->rt_priority = newp->rt_priority; ! p->starttime = newp->starttime; p->stime = newp->stime; p->num_threads = newp->num_threads; p->uptime = newp->uptime; p->utime = newp->utime; ! p->guest_time = newp->guest_time; } return ix; } --- 610,635 ---- uint64_t time_diff = newp->uptime - p->uptime; p->CPUs_used = 100 * (utime_diff + stime_diff) / time_diff; } ! if ((!p->comm) || (strcmp(p->comm, newp->comm))) { ! if (p->comm) { ! free(p->comm); ! } ! p->comm = strdup(newp->comm); ! } ! // p->pgrp = newp->pgrp; ! // p->nice = newp->nice; ! // p->policy = newp->policy; ! // p->priority = newp->priority; p->processor = newp->processor; p->rss_MBs = newp->rss_MBs; ! // p->vsize_MBs = newp->vsize_MBs; ! // p->rt_priority = newp->rt_priority; ! // p->starttime = newp->starttime; p->stime = newp->stime; p->num_threads = newp->num_threads; p->uptime = newp->uptime; p->utime = newp->utime; ! // p->guest_time = newp->guest_time; } return ix; } *************** *** 451,465 **** if (ix >= 0) { // remove the target process_data_p dp = &process_hash_table[ix]; ! if (dp->comm_name) { free(dp->comm_name); } if (dp->cpuset_name) { free(dp->cpuset_name); } ! if (dp->node_list_p) { FREE_LIST(dp->node_list_p); } memset(dp, 0, sizeof(process_data_t)); // bubble up the collision chain while (pid = process_hash_table[++ix].pid) { if (process_hash_lookup(pid) < 0) { if (process_hash_rehash(ix) < 0) { ! perror("rehash fail"); } } } --- 654,668 ---- if (ix >= 0) { // remove the target process_data_p dp = &process_hash_table[ix]; ! if (dp->comm) { free(dp->comm); } if (dp->cpuset_name) { free(dp->cpuset_name); } ! // if (dp->node_list_p) { FREE_LIST(dp->node_list_p); } memset(dp, 0, sizeof(process_data_t)); // bubble up the collision chain while (pid = process_hash_table[++ix].pid) { if (process_hash_lookup(pid) < 0) { if (process_hash_rehash(ix) < 0) { ! numad_log(LOG_ERR, "rehash fail\n"); } } } *************** *** 472,548 **** - FILE *open_log_file() { - log_fs = fopen(VAR_LOG_FILE, "a"); - return log_fs; - } - - - int numad_log(const char *fmt, ...) { - if (log_fs == NULL) { - if (open_log_file() == NULL) { - perror("Cannot open log file"); - exit(EXIT_FAILURE); - } - } - va_list ap; - va_start(ap, fmt); - int rc = vfprintf(log_fs, fmt, ap); - va_end(ap); - fflush(log_fs); - return rc; - } - - - void close_log_file() { - if (log_fs != NULL) { - fclose(log_fs); - log_fs = NULL; - } - } - - - void shut_down_numad() { - unlink(VAR_RUN_FILE); - close_log_file(); - exit(EXIT_SUCCESS); - } - - - - void print_version_and_exit(char *prog_name) { - fprintf(stdout, "%s version: %s: compiled %s\n", prog_name, VERSION_STRING, __DATE__); - exit(EXIT_SUCCESS); - } - - - void print_usage_and_exit(char *prog_name) { - fprintf(stderr, "Usage: %s ...\n", prog_name); - fprintf(stderr, "-d for debug\n"); - fprintf(stderr, "-h to print this usage info\n"); - fprintf(stderr, "-i to specify second max_interval\n"); - // fprintf(stderr, "-q for quiet\n"); - fprintf(stderr, "-u to specify target utilization percent\n"); - fprintf(stderr, "-v for verbose\n"); - fprintf(stderr, "-V to show version info\n"); - fprintf(stderr, "-w [:] for node suggestions\n"); - exit(EXIT_FAILURE); - } void check_prereqs(char *prog_name) { // Verify cpusets are available on this system. ! if (access(CPUSET_DIR, F_OK) < 0) { ! fprintf(stderr, "\n"); ! fprintf(stderr, "Are CPUSETs enabled on this system?\n"); ! fprintf(stderr, "They are required for %s to function.\n", prog_name); ! fprintf(stderr, "Check manpage CPUSET(7). You might need to do something like:\n"); ! fprintf(stderr, " # mkdir %s\n", CPUSET_DIR); ! fprintf(stderr, " # mount cgroup -t cgroup -o cpuset %s\n", CPUSET_DIR); ! fprintf(stderr, "and then try again...\n"); ! fprintf(stderr, "\n"); ! // FIXME: should provide cmd line option to specify alternate cpuset dir path ! exit(EXIT_FAILURE); } // Karl Rister says: FYI, we ended up tuning khugepaged -- to // more aggressively re-build 2MB pages after VM memory migrations -- like this: --- 675,698 ---- void check_prereqs(char *prog_name) { // Verify cpusets are available on this system. ! if (access(cpuset_dir, F_OK) < 0) { ! cpuset_dir = cpuset_dir_old; ! if (access(cpuset_dir, F_OK) < 0) { ! fprintf(stderr, "\n"); ! fprintf(stderr, "Are CPUSETs enabled on this system?\n"); ! fprintf(stderr, "They are required for %s to function.\n", prog_name); ! fprintf(stderr, "Check manpage CPUSET(7). You might need to do something like:\n"); ! fprintf(stderr, " # mkdir %s (or %s)\n", cpuset_dir_new, cpuset_dir_old); ! fprintf(stderr, " # mount cgroup -t cgroup -o cpuset %s (or %s)\n", cpuset_dir_new, cpuset_dir_old); ! fprintf(stderr, "and then try again...\n"); ! fprintf(stderr, "\n"); ! // FIXME: should provide cmd line option to specify alternate cpuset dir path ! exit(EXIT_FAILURE); ! } } // Karl Rister says: FYI, we ended up tuning khugepaged -- to // more aggressively re-build 2MB pages after VM memory migrations -- like this: *************** *** 561,578 **** int bytes = read(fd, buf, BUF_SIZE); close(fd); if (bytes > 0) { - // convert consecutive digits to a number char *p = buf; ! ms = *p++ - '0'; ! while (isdigit(*p)) { ! ms *= 10; ! ms += (*p++ - '0'); ! } } if (ms > 100) { fprintf(stderr, "\n"); ! fprintf(stderr, "Looks like transparent hugepage scan time in %s is %d ms.\n", thp_scan_fname, ms); ! numad_log("Looks like transparent hugepage scan time in %s is %d ms.\n", thp_scan_fname, ms); fprintf(stderr, "Consider increasing the frequency of THP scanning,\n"); fprintf(stderr, "by echoing a smaller number (e.g. 10) to %s\n", thp_scan_fname); fprintf(stderr, "to more agressively (re)construct THPs. For example:\n"); --- 711,723 ---- int bytes = read(fd, buf, BUF_SIZE); close(fd); if (bytes > 0) { char *p = buf; ! CONVERT_DIGITS_TO_NUM(p, ms); } if (ms > 100) { fprintf(stderr, "\n"); ! numad_log(LOG_NOTICE, "Looks like transparent hugepage scan time in %s is %d ms.\n", thp_scan_fname, ms); ! fprintf(stderr, "Looks like transparent hugepage scan time in %s is %d ms.\n", thp_scan_fname, ms); fprintf(stderr, "Consider increasing the frequency of THP scanning,\n"); fprintf(stderr, "by echoing a smaller number (e.g. 10) to %s\n", thp_scan_fname); fprintf(stderr, "to more agressively (re)construct THPs. For example:\n"); *************** *** 583,588 **** --- 728,761 ---- } + int get_daemon_pid() { + int fd = open(VAR_RUN_FILE, O_RDONLY, 0); + if (fd < 0) { + return 0; + } + char buf[BUF_SIZE]; + int bytes = read(fd, buf, BUF_SIZE); + close(fd); + if (bytes > 0) { + int pid; + char *p = buf; + CONVERT_DIGITS_TO_NUM(p, pid); + // Check run file pid still active + char fname[FNAME_SIZE]; + snprintf(fname, FNAME_SIZE, "/proc/%d", pid); + if (access(fname, F_OK) < 0) { + if (errno == ENOENT) { + numad_log(LOG_NOTICE, "Removing out-of-date numad run file because %s doesn't exist\n", fname); + unlink(VAR_RUN_FILE); + } + return 0; + } + // Daemon must be running already. + return pid; + } + } + + int register_numad_pid() { int pid; char buf[BUF_SIZE]; *************** *** 594,641 **** sprintf(buf, "%d\n", pid); write(fd, buf, strlen(buf)); close(fd); ! numad_log("Registering numad version %s PID %d\n", VERSION_STRING, pid); ! return 1; } if (errno == EEXIST) { ! fd = open(VAR_RUN_FILE, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (fd < 0) { goto fail_numad_run_file; } int bytes = read(fd, buf, BUF_SIZE); close(fd); if (bytes > 0) { - // convert consecutive digits to a number char *p = buf; ! pid = *p++ - '0'; ! while (isdigit(*p)) { ! pid *= 10; ! pid += (*p++ - '0'); ! } // Check pid in run file still active char fname[FNAME_SIZE]; snprintf(fname, FNAME_SIZE, "/proc/%d", pid); if (access(fname, F_OK) < 0) { if (errno == ENOENT) { // Assume run file is out-of-date... ! numad_log("Removing out-of-date numad run file because %s doesn't exist\n", fname); unlink(VAR_RUN_FILE); goto create_run_file; } } // Daemon must be running already. ! return 0; } - } fail_numad_run_file: ! perror("Cannot open numad.pid file"); exit(EXIT_FAILURE); - - } - - - void sigint_handler(int sig) { } --- 767,803 ---- sprintf(buf, "%d\n", pid); write(fd, buf, strlen(buf)); close(fd); ! numad_log(LOG_NOTICE, "Registering numad version %s PID %d\n", VERSION_STRING, pid); ! return pid; } if (errno == EEXIST) { ! fd = open(VAR_RUN_FILE, O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); if (fd < 0) { goto fail_numad_run_file; } int bytes = read(fd, buf, BUF_SIZE); close(fd); if (bytes > 0) { char *p = buf; ! CONVERT_DIGITS_TO_NUM(p, pid); // Check pid in run file still active char fname[FNAME_SIZE]; snprintf(fname, FNAME_SIZE, "/proc/%d", pid); if (access(fname, F_OK) < 0) { if (errno == ENOENT) { // Assume run file is out-of-date... ! numad_log(LOG_NOTICE, "Removing out-of-date numad run file because %s doesn't exist\n", fname); unlink(VAR_RUN_FILE); goto create_run_file; } } // Daemon must be running already. ! return pid; } } fail_numad_run_file: ! numad_log(LOG_CRIT, "Cannot open numad.pid file\n"); exit(EXIT_FAILURE); } *************** *** 648,664 **** } - void bind_self_to_cpu(int cpu_id, size_t cpu_set_size, cpu_set_t *cpu_set_p) { - CPU_ZERO_S(cpu_set_size, cpu_set_p); - CPU_SET_S(cpu_id, cpu_set_size, cpu_set_p); - if (sched_setaffinity(0, cpu_set_size, cpu_set_p) < 0) { - perror("sched_setaffinity"); - exit(EXIT_FAILURE); - } - sched_yield(); - } - - int get_num_cpus() { int n = sysconf(_SC_NPROCESSORS_CONF); int n2 = sysconf(_SC_NPROCESSORS_ONLN); --- 810,815 ---- *************** *** 673,679 **** int huge_page_size = 0;; FILE *fs = fopen("/proc/meminfo", "r"); if (!fs) { ! perror("Can't open /proc/meminfo"); exit(EXIT_FAILURE); } char buf[BUF_SIZE]; --- 824,830 ---- int huge_page_size = 0;; FILE *fs = fopen("/proc/meminfo", "r"); if (!fs) { ! numad_log(LOG_CRIT, "Can't open /proc/meminfo\n"); exit(EXIT_FAILURE); } char buf[BUF_SIZE]; *************** *** 721,727 **** goto fail_proc_uptime; } } ! // convert consecutive digits to a number tmpul[ix] = *p++ - '0'; while (isdigit(*p)) { tmpul[ix] *= 10; --- 872,878 ---- goto fail_proc_uptime; } } ! // convert consecutive digits to a number, ignoring decimal point thus scaling by 100 tmpul[ix] = *p++ - '0'; while (isdigit(*p)) { tmpul[ix] *= 10; *************** *** 736,742 **** } return tmpul[0]; fail_proc_uptime: ! perror("Cannot get /proc/uptime contents"); exit(EXIT_FAILURE); } --- 887,893 ---- } return tmpul[0]; fail_proc_uptime: ! numad_log(LOG_CRIT, "Cannot get /proc/uptime contents\n"); exit(EXIT_FAILURE); } *************** *** 751,769 **** void show_nodes() { for (int ix = 0; (ix < num_nodes); ix++) { ! numad_log("Node %d: MBs_total %ld, MBs_free %ld, CPUs_total %ld, CPUs_free %ld, Distance: ", ix, node[ix].MBs_total, node[ix].MBs_free, node[ix].CPUs_total, node[ix].CPUs_free); for (int d = 0; (d < num_nodes); d++) { ! numad_log("%d ", node[ix].distance[d]); } char buf[BUF_SIZE]; str_from_id_list(buf, BUF_SIZE, node[ix].cpu_list_p); ! numad_log(" CPUs: %s\n", buf); } ! numad_log("\n"); } --- 902,1040 ---- + int bind_process_and_migrate_memory(int pid, char *cpuset_name, id_list_p node_list_p, id_list_p cpu_list_p) { + // Check basic parameter validity. + if (pid <= 0) { + numad_log(LOG_CRIT, "Bad PID to bind\n"); + exit(EXIT_FAILURE); + } + if ((cpuset_name == NULL) || (strlen(cpuset_name) == 0)) { + numad_log(LOG_CRIT, "Bad cpuset name to bind\n"); + exit(EXIT_FAILURE); + } + int nodes; + if ((node_list_p == NULL) || ((nodes = NUM_IDS_IN_LIST(node_list_p)) == 0)) { + numad_log(LOG_CRIT, "Cannot bind to unspecified node\n"); + exit(EXIT_FAILURE); + } + // Cpu_list_p is optional and may be NULL... + // Generate CPU id list from the specified node list if necessary + if (cpu_list_p == NULL) { + static id_list_p tmp_cpu_list_p; + CLEAR_LIST(tmp_cpu_list_p); + int node_id = 0; + while (nodes) { + if (ID_IS_IN_LIST(node_id, node_list_p)) { + OR_LISTS(tmp_cpu_list_p, tmp_cpu_list_p, node[node_id].cpu_list_p); + nodes -= 1; + } + node_id += 1; + } + cpu_list_p = tmp_cpu_list_p; + } + // Make the cpuset directory if necessary + char cpuset_name_buf[FNAME_SIZE]; + snprintf(cpuset_name_buf, FNAME_SIZE, "%s%s", cpuset_dir, cpuset_name); + char *p = &cpuset_name_buf[strlen(cpuset_dir)]; + if (!strcmp(p, "/")) { + // Make a cpuset directory for this process + snprintf(cpuset_name_buf, FNAME_SIZE, "%s/numad.%d", cpuset_dir, pid); + numad_log(LOG_NOTICE, "Making new cpuset: %s\n", cpuset_name_buf); + int rc = mkdir(cpuset_name_buf, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + if (rc == -1) { + numad_log(LOG_CRIT, "Bad cpuset mkdir\n"); + exit(EXIT_FAILURE); + } + } + cpuset_name = cpuset_name_buf; + // Now that we have a cpuset for pid and a populated cpulist, + // start the actual binding and migration. + uint64_t t0 = get_uptime(NULL); + // Write CPU IDs out to cpuset.cpus file + char fname[FNAME_SIZE]; + char id_list_buf[BUF_SIZE]; + snprintf(fname, FNAME_SIZE, "%s/cpuset.cpus", cpuset_name); + int fd = open(fname, O_WRONLY | O_TRUNC, 0); + if (fd == -1) { + numad_log(LOG_CRIT, "Could not open cpuset.cpus\n"); + exit(EXIT_FAILURE); + } + int len = str_from_id_list(id_list_buf, BUF_SIZE, cpu_list_p); + write(fd, id_list_buf, len); + close(fd); + // Write node IDs out to cpuset.mems file + snprintf(fname, FNAME_SIZE, "%s/cpuset.mems", cpuset_name); + fd = open(fname, O_WRONLY | O_TRUNC, 0); + if (fd == -1) { + numad_log(LOG_CRIT, "Could not open cpuset.mems\n"); + exit(EXIT_FAILURE); + } + len = str_from_id_list(id_list_buf, BUF_SIZE, node_list_p); + write(fd, id_list_buf, len); + close(fd); + // Write "1" out to cpuset.memory_migrate file + snprintf(fname, FNAME_SIZE, "%s/cpuset.memory_migrate", cpuset_name); + fd = open(fname, O_WRONLY | O_TRUNC, 0); + if (fd == -1) { + numad_log(LOG_CRIT, "Could not open cpuset.memory_migrate\n"); + exit(EXIT_FAILURE); + } + write(fd, "1", 1); + close(fd); + + // Copy pid tasks one at a time to tasks file + snprintf(fname, FNAME_SIZE, "%s/tasks", cpuset_name); + fd = open(fname, O_WRONLY | O_TRUNC, 0); + if (fd == -1) { + numad_log(LOG_CRIT, "Could not open tasks\n"); + exit(EXIT_FAILURE); + } + snprintf(fname, FNAME_SIZE, "/proc/%d/task", pid); + struct dirent **namelist; + int files = scandir(fname, &namelist, name_starts_with_digit, NULL); + if (files < 0) { + numad_log(LOG_WARNING, "Could not scandir task list\n"); + return 0; // Assume the process terminated + } + for (int ix = 0; (ix < files); ix++) { + // copy pid tasks, one at a time + numad_log(LOG_NOTICE, "Including task: %s\n", namelist[ix]->d_name); + write(fd, namelist[ix]->d_name, strlen(namelist[ix]->d_name)); + } + close(fd); + + uint64_t t1 = get_uptime(NULL); + // Check pid still active + snprintf(fname, FNAME_SIZE, "/proc/%d", pid); + if (access(fname, F_OK) < 0) { + numad_log(LOG_WARNING, "Could not migrate pid\n"); + return 0; // Assume the process terminated + } + numad_log(LOG_NOTICE, "PID %d moved to node(s) %s in %d.%d seconds\n", pid, id_list_buf, (t1-t0)/100, (t1-t0)%100); + return 1; + } + + + + + void show_nodes() { + time_t ts = time(NULL); + fprintf(log_fs, "%s", ctime(&ts)); + fprintf(log_fs, "Nodes: %d\n", num_nodes); for (int ix = 0; (ix < num_nodes); ix++) { ! fprintf(log_fs, "Node %d: MBs_total %ld, MBs_free %ld, CPUs_total %ld, CPUs_free %ld, Distance: ", ix, node[ix].MBs_total, node[ix].MBs_free, node[ix].CPUs_total, node[ix].CPUs_free); for (int d = 0; (d < num_nodes); d++) { ! fprintf(log_fs, "%d ", node[ix].distance[d]); } char buf[BUF_SIZE]; str_from_id_list(buf, BUF_SIZE, node[ix].cpu_list_p); ! fprintf(log_fs, " CPUs: %s\n", buf); } ! fprintf(log_fs, "\n"); ! fflush(log_fs); } *************** *** 801,807 **** } #else if ( (buf[0] == 'c') && (buf[1] == 'p') && (buf[2] == 'u') && (isdigit(buf[3])) ) { ! char *p = &buf[3]; int cpu_id = *p++ - '0'; while (isdigit(*p)) { cpu_id *= 10; cpu_id += (*p++ - '0'); } while (!isdigit(*p)) { p++; } while (isdigit(*p)) { p++; } // skip user while (!isdigit(*p)) { p++; } while (isdigit(*p)) { p++; } // skip nice --- 1072,1078 ---- } #else if ( (buf[0] == 'c') && (buf[1] == 'p') && (buf[2] == 'u') && (isdigit(buf[3])) ) { ! register char *p = &buf[3]; int cpu_id = *p++ - '0'; while (isdigit(*p)) { cpu_id *= 10; cpu_id += (*p++ - '0'); } while (!isdigit(*p)) { p++; } while (isdigit(*p)) { p++; } // skip user while (!isdigit(*p)) { p++; } while (isdigit(*p)) { p++; } // skip nice *************** *** 816,822 **** cur_cpu_data_buf = new; return (cpu_data_buf[1 - new].uptime > 0); // True if both buffers valid fail_proc_stat: ! perror("Cannot get /proc/stat contents"); exit(EXIT_FAILURE); } --- 1087,1093 ---- cur_cpu_data_buf = new; return (cpu_data_buf[1 - new].uptime > 0); // True if both buffers valid fail_proc_stat: ! numad_log(LOG_CRIT, "Cannot get /proc/stat contents\n"); exit(EXIT_FAILURE); } *************** *** 839,845 **** fclose(fs); // Assign the node id if (num_nodes >= MAX_NODES) { ! perror("NODE table too small"); exit(EXIT_FAILURE); } node[num_nodes].node_id = num_nodes; --- 1110,1116 ---- fclose(fs); // Assign the node id if (num_nodes >= MAX_NODES) { ! numad_log(LOG_CRIT, "NODE table too small\n"); exit(EXIT_FAILURE); } node[num_nodes].node_id = num_nodes; *************** *** 853,860 **** snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/distance", num_nodes); fs = fopen(fname, "r"); if (!fs) { ! perror("Unexpected fopen() failure while getting node distance data"); ! break; } int rnode = 0; get_next_latf_line: --- 1124,1131 ---- snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/distance", num_nodes); fs = fopen(fname, "r"); if (!fs) { ! numad_log(LOG_CRIT, "Unexpected fopen() failure while getting node distance data\n"); ! exit(EXIT_FAILURE); } int rnode = 0; get_next_latf_line: *************** *** 867,878 **** goto get_next_latf_line; } } ! // convert consecutive digits to a latency factor ! int latf = *p++ - '0'; ! while (isdigit(*p)) { ! latf *= 10; ! latf += (*p++ - '0'); ! } node[num_nodes].distance[rnode++] = latf; } } --- 1138,1145 ---- goto get_next_latf_line; } } ! int latf; ! CONVERT_DIGITS_TO_NUM(p, latf); node[num_nodes].distance[rnode++] = latf; } } *************** *** 881,888 **** snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/meminfo", num_nodes); fs = fopen(fname, "r"); if (!fs) { ! perror("Unexpected fopen() failure while getting node meminfo"); ! break; } int MemTotal = 0; int MemFree = 0; --- 1148,1155 ---- snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/meminfo", num_nodes); fs = fopen(fname, "r"); if (!fs) { ! numad_log(LOG_CRIT, "Unexpected fopen() failure while getting node meminfo\n"); ! exit(EXIT_FAILURE); } int MemTotal = 0; int MemFree = 0; *************** *** 908,919 **** goto get_next_mem_line; } } ! // convert consecutive digits to a number ! uint64_t KB = *p++ - '0'; ! while (isdigit(*p)) { ! KB *= 10; ! KB += (*p++ - '0'); ! } *MB_ptr = KB / 1024; } fclose(fs); --- 1175,1182 ---- goto get_next_mem_line; } } ! uint64_t KB; ! CONVERT_DIGITS_TO_NUM(p, KB); *MB_ptr = KB / 1024; } fclose(fs); *************** *** 944,951 **** node[node_ix].magnitude = node[node_ix].CPUs_free * node[node_ix].MBs_free; } } ! ! if (verbose) { show_nodes(); } return num_nodes; --- 1207,1214 ---- node[node_ix].magnitude = node[node_ix].CPUs_free * node[node_ix].MBs_free; } } ! // FIXME: add code here to calculate a new capacity vector scaled more precisely by node distance ! if (log_level >= LOG_INFO) { show_nodes(); } return num_nodes; *************** *** 952,967 **** } stat_data_p get_stat_data(char *fname) { - static stat_data_t data; - static char comm_buf[BUF_SIZE]; - data.comm = comm_buf; - char buf[BUF_SIZE]; FILE *fs = fopen(fname, "r"); ! if ((!fs) || (!fgets(buf, BUF_SIZE, fs))) { ! numad_log("Could not read stat file: %s\n", fname); return NULL; } int scanf_elements = sscanf(buf, "%d %s %c %d %d %d %d %d %u %lu %lu %lu %lu %lu %lu %ld %ld " "%ld %ld %ld %ld %llu %lu %ld %lu %lu %lu %lu %lu %lu %lu %lu " --- 1215,1239 ---- } + + stat_data_p get_stat_data(char *fname) { FILE *fs = fopen(fname, "r"); ! if (!fs) { ! numad_log(LOG_WARNING, "Could not open stat file: %s\n", fname); ! return NULL; ! } ! static char buf[BUF_SIZE]; ! register char *p = fgets(buf, BUF_SIZE, fs); ! fclose(fs); ! if (!p) { ! numad_log(LOG_WARNING, "Could not read stat file: %s\n", fname); return NULL; } + static stat_data_t data; + #if (NEED_ALL_STAT_DATA) + static char comm_buf[BUF_SIZE]; + data.comm = comm_buf; int scanf_elements = sscanf(buf, "%d %s %c %d %d %d %d %d %u %lu %lu %lu %lu %lu %lu %ld %ld " "%ld %ld %ld %ld %llu %lu %ld %lu %lu %lu %lu %lu %lu %lu %lu " *************** *** 978,987 **** &data.rt_priority, &data.policy, &data.delayacct_blkio_ticks, &data.guest_time, &data.cguest_time); if (scanf_elements < 43) { ! perror ("Could not parse stat file"); return NULL; } ! fclose(fs); return &data; } --- 1250,1278 ---- &data.rt_priority, &data.policy, &data.delayacct_blkio_ticks, &data.guest_time, &data.cguest_time); if (scanf_elements < 43) { ! numad_log(LOG_WARNING, "Could not parse stat file\n"); return NULL; } ! #else ! // Just parse a select few of the data items ! data.pid = *p++ - '0'; while (isdigit(*p)) { data.pid *= 10; data.pid += (*p++ - '0'); } ! while (*p == ' ') { p++; } ! data.comm = p; while (*p != ' ') { p++; } ! *p++ = '\0'; ! for (int ix = 0; (ix < 11); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } ! data.utime = *p++ - '0'; while (isdigit(*p)) { data.utime *= 10; data.utime += (*p++ - '0'); } ! while (*p == ' ') { p++; } ! data.stime = *p++ - '0'; while (isdigit(*p)) { data.stime *= 10; data.stime += (*p++ - '0'); } ! while (*p == ' ') { p++; } ! for (int ix = 0; (ix < 4); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } ! data.num_threads = *p++ - '0'; while (isdigit(*p)) { data.num_threads *= 10; data.num_threads += (*p++ - '0'); } ! while (*p == ' ') { p++; } ! for (int ix = 0; (ix < 3); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } ! data.rss = *p++ - '0'; while (isdigit(*p)) { data.rss *= 10; data.rss += (*p++ - '0'); } ! while (*p == ' ') { p++; } ! for (int ix = 0; (ix < 14); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } ! data.processor = *p++ - '0'; while (isdigit(*p)) { data.processor *= 10; data.processor += (*p++ - '0'); } ! #endif return &data; } *************** *** 998,1007 **** struct dirent **namelist; int files = scandir("/proc", &namelist, name_starts_with_digit, NULL); if (files < 0) { ! perror ("Could not open /proc"); } ! if (verbose) { ! numad_log("Processes: %d\n", files); } for (int ix = 0; (ix < files); ix++) { stat_data_p sdata_p; --- 1287,1297 ---- struct dirent **namelist; int files = scandir("/proc", &namelist, name_starts_with_digit, NULL); if (files < 0) { ! numad_log(LOG_CRIT, "Could not open /proc\n"); ! exit(EXIT_FAILURE); } ! if (log_level >= LOG_INFO) { ! numad_log(LOG_INFO, "Processes: %d\n", files); } for (int ix = 0; (ix < files); ix++) { stat_data_p sdata_p; *************** *** 1024,1043 **** // printf("PID: %d %s %d %lu\n", sdata_p->pid, sdata_p->comm, sdata_p->num_threads, sdata_p->vsize); if (1) { pdata.pid = sdata_p->pid; ! pdata.pgrp = sdata_p->pgrp; ! pdata.comm_name = sdata_p->comm; pdata.utime = sdata_p->utime; pdata.stime = sdata_p->stime; ! pdata.priority = sdata_p->priority; ! pdata.nice = sdata_p->nice; pdata.num_threads = sdata_p->num_threads; ! pdata.starttime = sdata_p->starttime; ! pdata.vsize_MBs = sdata_p->vsize / MEGABYTE; pdata.rss_MBs = (sdata_p->rss * page_size_in_bytes) / MEGABYTE; pdata.processor = sdata_p->processor; ! pdata.rt_priority = sdata_p->rt_priority; ! pdata.policy = sdata_p->policy; ! pdata.guest_time = sdata_p->guest_time; process_hash_update(&pdata); } } --- 1314,1333 ---- // printf("PID: %d %s %d %lu\n", sdata_p->pid, sdata_p->comm, sdata_p->num_threads, sdata_p->vsize); if (1) { pdata.pid = sdata_p->pid; ! // pdata.pgrp = sdata_p->pgrp; ! pdata.comm = sdata_p->comm; pdata.utime = sdata_p->utime; pdata.stime = sdata_p->stime; ! // pdata.priority = sdata_p->priority; ! // pdata.nice = sdata_p->nice; pdata.num_threads = sdata_p->num_threads; ! // pdata.starttime = sdata_p->starttime; ! // pdata.vsize_MBs = sdata_p->vsize / MEGABYTE; pdata.rss_MBs = (sdata_p->rss * page_size_in_bytes) / MEGABYTE; pdata.processor = sdata_p->processor; ! // pdata.rt_priority = sdata_p->rt_priority; ! // pdata.policy = sdata_p->policy; ! // pdata.guest_time = sdata_p->guest_time; process_hash_update(&pdata); } } *************** *** 1056,1067 **** if (access(fname, F_OK) < 0) { // Seems dead. Forget this pid -- after first checking // and removing obsolete numad.PID cpuset directories. ! snprintf(fname, FNAME_SIZE, "%s/numad.%d", CPUSET_DIR, p->pid); if (access(fname, F_OK) == 0) { ! numad_log("Removing obsolete cpuset: %s\n", fname); int rc = rmdir(fname); if (rc == -1) { ! perror("bad cpuset rmdir"); // exit(EXIT_FAILURE); } } --- 1346,1357 ---- if (access(fname, F_OK) < 0) { // Seems dead. Forget this pid -- after first checking // and removing obsolete numad.PID cpuset directories. ! snprintf(fname, FNAME_SIZE, "%s/numad.%d", cpuset_dir, p->pid); if (access(fname, F_OK) == 0) { ! numad_log(LOG_NOTICE, "Removing obsolete cpuset: %s\n", fname); int rc = rmdir(fname); if (rc == -1) { ! numad_log(LOG_ERR, "bad cpuset rmdir\n"); // exit(EXIT_FAILURE); } } *************** *** 1076,1087 **** void show_processes(process_data_p *ptr, int nprocs) { ! numad_log("Candidates: %d\n", nprocs); for (int ix = 0; (ix < nprocs); ix++) { process_data_p p = ptr[ix]; char buf[BUF_SIZE]; ! snprintf(buf, BUF_SIZE, "%s%s/cpuset.mems", CPUSET_DIR, p->cpuset_name); FILE *fs = fopen(buf, "r"); buf[0] = '\0'; if (fs) { --- 1366,1661 ---- + + + id_list_p pick_numa_nodes(int pid, int cpus, int mbs) { + int pid_ix; + process_data_p p = NULL; + int cpuset_name_len = 0; + static id_list_p existing_mems_list_p; + uint64_t process_MBs[MAX_NODES]; + uint64_t process_CPUs[MAX_NODES]; + CLEAR_LIST(existing_mems_list_p); + memset(process_MBs, 0, sizeof(process_MBs)); + memset(process_CPUs, 0, sizeof(process_CPUs)); + if (log_level >= LOG_DEBUG) { + numad_log(LOG_DEBUG, "PICK NODES FOR: PID: %d, CPUs %d, MBs %d\n", pid, cpus, mbs); + } + // For existing processes, get miscellaneous process specific details + if ((pid > 0) && ((pid_ix = process_hash_lookup(pid)) >= 0)) { + p = &process_hash_table[pid_ix]; + char buf[BUF_SIZE]; + char fname[FNAME_SIZE]; + // First get cpuset name for this process, and existing mems binding, if any. + snprintf(fname, FNAME_SIZE, "/proc/%d/cpuset", pid); + FILE *fs = fopen(fname, "r"); + if (!fs) { + numad_log(LOG_WARNING, "Tried to research PID %d cpuset, but it apparently went away.\n", p->pid); + return NULL; // Assume the process terminated? + } + if (!fgets(buf, BUF_SIZE, fs)) { + numad_log(LOG_WARNING, "Tried to research PID %d cpuset, but it apparently went away.\n", p->pid); + // FIXME: open file leak + return NULL; // Assume the process terminated? + } + fclose(fs); + ELIM_NEW_LINE(buf); + if ((!p->cpuset_name) || (strcmp(p->cpuset_name, buf))) { + if (p->cpuset_name != NULL) { + free(p->cpuset_name); + } + p->cpuset_name = strdup(buf); + } + cpuset_name_len = strlen(p->cpuset_name); + if (log_level >= LOG_DEBUG) { + numad_log(LOG_DEBUG, "CPUSET_NAME: %s\n", p->cpuset_name); + } + + int num_existing_mems = 0; + snprintf(fname, FNAME_SIZE, "%s%s/cpuset.mems", cpuset_dir, p->cpuset_name); + fs = fopen(fname, "r"); + if ((fs) && (fgets(buf, BUF_SIZE, fs))) { + fclose(fs); + num_existing_mems = add_ids_to_list_from_str(existing_mems_list_p, buf); + if (log_level >= LOG_DEBUG) { + // FIXME: just print buf w/o exercising conversion routines + str_from_id_list(buf, BUF_SIZE, existing_mems_list_p); + numad_log(LOG_DEBUG, "EXISTING NODE LIST: %s\n", buf); + } + } + + // If we have bound this process to the same nodes multiple times + // already, and the load on those nodes seems acceptable, skip the rest + // of this and just return NULL to indicate no change needed. FIXME: + // figure out what else can change that should cause a rebinding (e.g. + // (1) some process gets sub-optimal allocation on busy machine which + // subsequently becomes less busy leaving disadvantaged process. (2) + // node load imbalance, (3) any process split across nodes with should + // fit within a single node.) + if (p->dup_bind_count > 1) { + int node_id = 0; + int nodes_have_cpu = 1; + int nodes_have_ram = 1; + int n = num_existing_mems; + while (n) { + if (ID_IS_IN_LIST(node_id, existing_mems_list_p)) { + nodes_have_cpu &= ((100 * node[node_id].CPUs_free / node[node_id].CPUs_total) >= (100 - target_utilization)); + nodes_have_ram &= ((100 * node[node_id].MBs_free / node[node_id].MBs_total) >= (100 - target_utilization)); + n -= 1; + } + node_id += 1; + } + if ((nodes_have_cpu) && (nodes_have_ram)) { + if (log_level >= LOG_DEBUG) { + numad_log(LOG_DEBUG, "Skipping evaluation because of repeat binding\n", pid, cpus, mbs); + } + return NULL; + } + } + + + // FIXME: this scanning is expensive and must be minimized + + + // Second, add up per-node memory in use by this process + snprintf(fname, FNAME_SIZE, "/proc/%d/numa_maps", pid); + fs = fopen(fname, "r"); + if (!fs) { + numad_log(LOG_WARNING, "Tried to research PID %d numamaps, but it apparently went away.\n", p->pid); + return NULL; // Assume the process terminated? + } + while (fgets(buf, BUF_SIZE, fs)) { + uint64_t page_size = page_size_in_bytes; + const char *delimiters = " \t\r\n"; + char *p = strtok(buf, delimiters); + while (p) { + if (!strcmp(p, "huge")) { + page_size = huge_page_size_in_bytes; + } else if (p[0] == 'N') { + int node = (int)strtol(&p[1], &p, 10); + if (p[0] != '=') { + numad_log(LOG_CRIT, "numa_maps node number parse error\n"); + exit(EXIT_FAILURE); + } + uint64_t pages = strtol(&p[1], &p, 10); + process_MBs[node] += (pages * page_size); + } + // Get next token on the line + p = strtok(NULL, delimiters); + } + } + for (int ix = 0; (ix < num_nodes); ix++) { + process_MBs[ix] /= MEGABYTE; + if (log_level >= LOG_DEBUG) { + numad_log(LOG_DEBUG, "PROCESS_MBs[%d]: %ld\n", ix, process_MBs[ix]); + } + } + fclose(fs); + + // Third, add up per-node CPUs recently used by this process + snprintf(fname, FNAME_SIZE, "/proc/%d/task", pid); + struct dirent **namelist; + int files = scandir(fname, &namelist, name_starts_with_digit, NULL); + if (files < 0) { + numad_log(LOG_WARNING, "Tried to research PID %d tasks, but it apparently went away.\n", p->pid); + return NULL; // Assume the process terminated? + } + for (int ix = 0; (ix < files); ix++) { + snprintf(fname, FNAME_SIZE, "/proc/%d/task/%s/stat", pid, namelist[ix]->d_name); + stat_data_p sdata_p = get_stat_data(fname); + if (sdata_p != NULL) { + process_CPUs[node_from_cpu[sdata_p->processor]] += 1; + } + } + for (int ix = 0; (ix < num_nodes); ix++) { + // Assume average load per thread + process_CPUs[ix] *= (p->CPUs_used / p->num_threads); + if (log_level >= LOG_DEBUG) { + numad_log(LOG_DEBUG, "PROCESS_CPUs[%d]: %ld\n", ix, process_CPUs[ix]); + } + } + } + + + + // Make a copy of node available resources array + static node_data_p tmp_node; // FIXME: what if num_nodes changes after allocate + if (tmp_node == NULL) { + tmp_node = malloc(num_nodes * sizeof(node_data_t) ); + if (tmp_node == NULL) { + numad_log(LOG_CRIT, "malloc failed\n"); + exit(EXIT_FAILURE); + } + } + memcpy(tmp_node, node, num_nodes * sizeof(node_data_t) ); + + // Add in the info specific to this process to equalize available resource + // quantities wrt locations of resources already in use by this process + for (int ix = 0; (ix < num_nodes); ix++) { + tmp_node[ix].MBs_free += process_MBs[ix]; + if (EQUAL_LISTS(existing_mems_list_p, all_nodes_list_p)) { + // If not already bound, consider only available memory for now (by marking all CPUs free) + tmp_node[ix].CPUs_free = tmp_node[ix].CPUs_total;; + } else { + // If already bound, consider existing location of CPUs + tmp_node[ix].CPUs_free += process_CPUs[ix]; + } + if (tmp_node[ix].CPUs_free > tmp_node[ix].CPUs_total) { + tmp_node[ix].CPUs_free = tmp_node[ix].CPUs_total; + } + // Calculate magnitude clearly biased towards existing location of memory + tmp_node[ix].magnitude = tmp_node[ix].CPUs_free * (tmp_node[ix].MBs_free + ((1 * process_MBs[ix]) / 2)); + } + + static id_list_p target_node_list_p; + CLEAR_LIST(target_node_list_p); + int prev_node = -1; + + // Allocate sufficient resources + while ((mbs > 0) || (cpus > 20)) { + + // First, sort nodes by magnitude + for (int ij = 0; (ij < num_nodes); ij++) { + int big_ix = ij; + for (int ik = ij; (ik < num_nodes); ik++) { + if (tmp_node[big_ix].magnitude < tmp_node[ik].magnitude) { + big_ix = ik; + } + } + if (big_ix != ij) { + node_data_t tmp; + memcpy((void *)&tmp, (void *)&tmp_node[ij], sizeof(node_data_t) ); + memcpy((void *)&tmp_node[ij], (void *)&tmp_node[big_ix], sizeof(node_data_t) ); + memcpy((void *)&tmp_node[big_ix], (void *)&tmp, sizeof(node_data_t) ); + } + } + + if (log_level >= LOG_DEBUG) { + for (int ix = 0; (ix < num_nodes); ix++) { + numad_log(LOG_DEBUG, "Sorted magnitude[%d]: %ld\n", tmp_node[ix].node_id, tmp_node[ix].magnitude); + } + } + + if (tmp_node[0].node_id == prev_node) { + // Not going to make progress... Just use everything + OR_LISTS(target_node_list_p, target_node_list_p, all_nodes_list_p); + break; + } + prev_node = tmp_node[0].node_id; + + ADD_ID_TO_LIST(tmp_node[0].node_id, target_node_list_p); + if (EQUAL_LISTS(target_node_list_p, all_nodes_list_p)) { + break; // Apparently must use all resource nodes... + } + + + #define MBS_MARGIN 10 + if (tmp_node[0].MBs_free >= (mbs + MBS_MARGIN)) { + tmp_node[0].MBs_free -= mbs; + mbs = 0; + } else { + mbs -= (tmp_node[0].MBs_free - MBS_MARGIN); + tmp_node[0].MBs_free = MBS_MARGIN; + } + #define CPUS_MARGIN 0 + if (tmp_node[0].CPUs_free >= (cpus + CPUS_MARGIN)) { + tmp_node[0].CPUs_free -= cpus; + cpus = 0; + } else { + cpus -= (tmp_node[0].CPUs_free - CPUS_MARGIN); + tmp_node[0].CPUs_free = CPUS_MARGIN; + } + tmp_node[0].magnitude = tmp_node[0].CPUs_free * tmp_node[0].MBs_free; + + + // FIXME + // adjust all the magnitudes by the distance in very sketchy way + for (int ix = 1; (ix < num_nodes); ix++) { + tmp_node[ix].magnitude /= tmp_node[0].distance[tmp_node[ix].node_id]; + } + } + + // If this existing process is already located where we want it, and almost + // all memory is already moved to those nodes, then return NULL indicating + // no need to change binding this time. + if ((pid > 0) && (EQUAL_LISTS(target_node_list_p, existing_mems_list_p))) { + // May not need to change binding. However, if there is any significant + // memory still on non-target nodes, advise the bind anyway. + p->dup_bind_count += 1; + for (int ix = 0; (ix < num_nodes); ix++) { + if ((process_MBs[ix] > 10) && (!ID_IS_IN_LIST(ix, target_node_list_p))) { + goto advise_bind; + } + } + return NULL; + } else { + if (p != NULL) { + p->dup_bind_count = 0; + } + } + + char buf1[BUF_SIZE]; + char buf2[BUF_SIZE]; + + advise_bind: + + str_from_id_list(buf1, BUF_SIZE, existing_mems_list_p); + str_from_id_list(buf2, BUF_SIZE, target_node_list_p); + numad_log(LOG_NOTICE, "Advising pid %d move from nodes (%s) to nodes (%s)\n", pid, buf1, buf2); + + return target_node_list_p; + } + + + void show_processes(process_data_p *ptr, int nprocs) { ! time_t ts = time(NULL); ! fprintf(log_fs, "%s", ctime(&ts)); ! fprintf(log_fs, "Candidates: %d\n", nprocs); for (int ix = 0; (ix < nprocs); ix++) { process_data_p p = ptr[ix]; char buf[BUF_SIZE]; ! snprintf(buf, BUF_SIZE, "%s%s/cpuset.mems", cpuset_dir, p->cpuset_name); FILE *fs = fopen(buf, "r"); buf[0] = '\0'; if (fs) { *************** *** 1090,1099 **** } fclose(fs); } ! numad_log("PID %d: MBs_used %ld, CPUs_used %ld, Threads %ld, Uptime %ld, Name: %s, Nodes: %s\n", ! p->pid, p->MBs_used, p->CPUs_used, p->num_threads, p->uptime, p->comm_name, buf); } ! numad_log("\n"); } --- 1664,1674 ---- } fclose(fs); } ! fprintf(log_fs, "PID %d: MBs_used %ld, CPUs_used %ld, Threads %ld, Uptime %ld, Name: %s, Nodes: %s\n", ! p->pid, p->MBs_used, p->CPUs_used, p->num_threads, p->uptime, p->comm, buf); } ! fprintf(log_fs, "\n"); ! fflush(log_fs); } *************** *** 1109,1119 **** pindex[nprocs++] = p; } } ! ! if (verbose) { show_processes(pindex, nprocs); } ! // Return maximum interval if no process movement return max_interval; } --- 1684,1730 ---- pindex[nprocs++] = p; } } ! // Sort index by amount of CPU used * amount of memory used. ! // Not expecting a long list here. So just use a simple sort. ! for (int ij = 0; (ij < nprocs); ij++) { ! int best = ij; ! for (int ik = ij; (ik < nprocs); ik++) { ! if ((pindex[ik]->CPUs_used * pindex[ik]->MBs_used) ! <= (pindex[best]->CPUs_used * pindex[best]->MBs_used)) continue; ! best = ik; ! } ! if (best != ij) { ! process_data_p tmp = pindex[ij]; ! pindex[ij] = pindex[best]; ! pindex[best] = tmp; ! } ! } ! if ((log_level >= LOG_INFO) && (nprocs > 0)) { show_processes(pindex, nprocs); } ! // Estimate desired size and make resource requests for each significant process ! for (int ix = 0; (ix < nprocs); ix++) { ! process_data_p p = pindex[ix]; ! if (p->CPUs_used * p->MBs_used < CPU_THRESHOLD * MEMORY_THRESHOLD) { ! break; // No more significant processes worth worrying about... ! } ! int mb_request = (p->MBs_used * 100) / target_utilization; ! int cpu_request = (p->CPUs_used * 100) / target_utilization; ! // Don't give a process more CPUs than it has threads ! int thread_limit = p->num_threads * CPU_SCALE_FACTOR; ! if (cpu_request > thread_limit) { ! cpu_request = thread_limit; ! } ! pthread_mutex_lock(&mutex); ! id_list_p node_list_p = pick_numa_nodes(p->pid, cpu_request, mb_request); ! // FIXME: copy node_list_p to shorten mutex region? ! if ((node_list_p != NULL) && (bind_process_and_migrate_memory(p->pid, p->cpuset_name, node_list_p, NULL))) { ! // Shorten interval if actively moving processes ! pthread_mutex_unlock(&mutex); ! return min_interval; ! } ! pthread_mutex_unlock(&mutex); ! } // Return maximum interval if no process movement return max_interval; } *************** *** 1121,1191 **** int main(int argc, char *argv[]) { int opt; ! while ((opt = getopt(argc, argv, "dhi:qu:vVw:")) != -1) { switch (opt) { ! case 'd': debug = 1; break; case 'h': print_usage_and_exit(argv[0]); break; ! case 'i': max_interval = atoi(optarg); break; ! case 'q': quiet = 1; break; ! case 'u': target_utilization = atoi(optarg); break; ! case 'v': verbose = 1; break; case 'V': print_version_and_exit(argv[0]); break; ! case 'w': { ! char *p = NULL; ! req_cpus = (int)strtol(optarg, &p, 10); ! if (p == optarg) { ! printf("Can't parse req_cpus: %s\n", optarg); ! exit(EXIT_FAILURE); ! } ! if (*p == ':') { ! char *q = p + 1; ! req_mbs = (int)strtol(q, &p, 10); ! if (p == q) { ! printf("Can't parse req_mbs: %s\n", q); ! exit(EXIT_FAILURE); ! } ! } ! break; ! } default: print_usage_and_exit(argv[0]); break; } } if (argc > optind) { ! printf("Unexpected arg = %s\n", argv[optind]); exit(EXIT_FAILURE); } - // Set verbose, if debug set - verbose |= debug; ! ! check_prereqs(argv[0]); num_cpus = get_num_cpus(); page_size_in_bytes = sysconf(_SC_PAGESIZE); huge_page_size_in_bytes = get_huge_page_size_in_bytes(); ! if (!register_numad_pid()) { ! // Daemon already running ! // Send message to persistant thread to handle request ! // and exit ! } ! ! // daemonize self ! // ! // spawn thread to handle messages from subsequent invocation requests ! // ! // execute following loop forever ! for (;;) { ! int interval = max_interval; ! if (update_nodes() > 1) { ! update_processes(); ! interval = manage_loads(); } ! sleep(interval); } exit(EXIT_SUCCESS); --- 1732,1917 ---- + void *set_dynamic_options(void *arg) { + int arg_value = *(int *)arg; + for (;;) { + // Loop here forever waiting for a msg to do something... + msg_t msg; + recv_msg(&msg); + switch (msg.body.cmd) { + case 'i': { + numad_log(LOG_NOTICE, "Changing interval to %d:%d\n", msg.body.arg1, msg.body.arg2); + min_interval = msg.body.arg1; + max_interval = msg.body.arg2; + if (max_interval <= 0) { + shut_down_numad(); + } + break; + } + case 'l': { + numad_log(LOG_NOTICE, "Changing log level to %d\n", msg.body.arg1); + log_level = msg.body.arg1; + break; + } + case 'u': { + numad_log(LOG_NOTICE, "Changing target utilization to %d\n", msg.body.arg1); + target_utilization = msg.body.arg1; + break; + } + case 'w': { + char buf[BUF_SIZE]; + numad_log(LOG_NOTICE, "Getting NUMA advice for %d CPUs and %d MBs\n", msg.body.arg1, msg.body.arg2); + pthread_mutex_lock(&mutex); + id_list_p node_list_p = pick_numa_nodes(-1, (msg.body.arg1 * CPU_SCALE_FACTOR), msg.body.arg2); + str_from_id_list(buf, BUF_SIZE, node_list_p); + pthread_mutex_unlock(&mutex); + send_msg(msg.body.src_pid, 'w', requested_cpus, requested_mbs, buf); + break; + } + default: { + numad_log(LOG_WARNING, "Unexpected msg command: %c %d %d %s from PID %d\n", + msg.body.cmd, msg.body.arg1, msg.body.arg1, msg.body.text, msg.body.src_pid); + break; + } + } + } // for (;;) + } + + + + void parse_two_arg_values(char *p, int *first_ptr, int *second_ptr, int first_is_optional) { + char *orig_p = p; + char *q = NULL; + int second = -1; + int first = (int)strtol(p, &p, 10); + if (p == orig_p) { + fprintf(stderr, "Can't parse arg value(s): %s\n", orig_p); + exit(EXIT_FAILURE); + } + if (*p == ':') { + q = p + 1; + second = (int)strtol(q, &p, 10); + if (p == q) { + fprintf(stderr, "Can't parse arg value(s): %s\n", orig_p); + exit(EXIT_FAILURE); + } + } + if (q != NULL) { + // Two numbers are present + if (first_ptr != NULL) *first_ptr = first; + if (second_ptr != NULL) *second_ptr = second; + } else if (first_is_optional) { + if (second_ptr != NULL) *second_ptr = first; + } else { + if (first_ptr != NULL) *first_ptr = first; + } + } + + int main(int argc, char *argv[]) { int opt; ! char *p = NULL; ! int i_flag = 0; ! int l_flag = 0; ! int u_flag = 0; ! int w_flag = 0; ! while ((opt = getopt(argc, argv, "dhi:l:u:vVw:")) != -1) { switch (opt) { ! case 'd': log_level = LOG_DEBUG ; break; case 'h': print_usage_and_exit(argv[0]); break; ! case 'i': i_flag = 1; parse_two_arg_values(optarg, &min_interval, &max_interval, 1); break; ! case 'l': l_flag = 1; log_level = atoi(optarg); break; ! case 'u': u_flag = 1; target_utilization = atoi(optarg); break; ! case 'v': log_level = LOG_INFO; break; case 'V': print_version_and_exit(argv[0]); break; ! case 'w': w_flag = 1; parse_two_arg_values(optarg, &requested_cpus, &requested_mbs, 0); break; default: print_usage_and_exit(argv[0]); break; } } if (argc > optind) { ! fprintf(stderr, "Unexpected arg = %s\n", argv[optind]); exit(EXIT_FAILURE); } ! open_log_file(); ! init_msg_queue(); num_cpus = get_num_cpus(); page_size_in_bytes = sysconf(_SC_PAGESIZE); huge_page_size_in_bytes = get_huge_page_size_in_bytes(); + // Figure out if this is the daemon, or a subsequent invocation + int daemon_pid = get_daemon_pid(); + if (daemon_pid > 0) { + // Daemon is already running. So send dynamic options to persistant + // thread to handle requests, get the response (if any), and finish. + msg_t msg; + if (i_flag) { + send_msg(daemon_pid, 'i', min_interval, max_interval, ""); + } + if (l_flag) { + send_msg(daemon_pid, 'l', log_level, 0, ""); + } + if (u_flag) { + send_msg(daemon_pid, 'u', target_utilization, 0, ""); + } + if (w_flag) { + send_msg(daemon_pid, 'w', requested_cpus, requested_mbs, ""); + recv_msg(&msg); + fprintf(stdout, "%s\n", msg.body.text); + } + } else { + check_prereqs(argv[0]); + // Daemonize self... + daemon_pid = fork(); + if (daemon_pid < 0) { numad_log(LOG_CRIT, "fork() failed\n"); exit(EXIT_FAILURE); } + // Parent process now exits + if (daemon_pid > 0) { exit(EXIT_SUCCESS); } + // Child process continues... + umask(S_IWGRP | S_IWOTH); // Reset the file mode + int sid = setsid(); // Start a new session + if (sid < 0) { numad_log(LOG_CRIT, "setsid() failed\n"); exit(EXIT_FAILURE); } + if ((chdir("/")) < 0) { numad_log(LOG_CRIT, "chdir() failed"); exit(EXIT_FAILURE); } + daemon_pid = register_numad_pid(); + if (daemon_pid != getpid()) { + numad_log(LOG_CRIT, "Could not register daemon PID\n"); + exit(EXIT_FAILURE); + } + fclose(stdin); + fclose(stdout); + if (log_fs != stderr) { + fclose(stderr); + } ! // spawn thread to handle messages from subsequent invocation requests ! pthread_mutex_init(&mutex, NULL); ! pthread_attr_t attr; ! if (pthread_attr_init(&attr) != 0) { ! numad_log(LOG_CRIT, "pthread_attr_init failure\n"); ! exit(EXIT_FAILURE); ! } ! pthread_t tid; ! if (pthread_create(&tid, &attr, &set_dynamic_options, &tid) != 0) { ! numad_log(LOG_CRIT, "pthread_create failure\n"); ! exit(EXIT_FAILURE); ! } ! // Loop here forwever... ! for (;;) { ! int interval = max_interval; ! pthread_mutex_lock(&mutex); ! int nodes = update_nodes(); ! pthread_mutex_unlock(&mutex); ! if (nodes > 1) { ! update_processes(); ! interval = manage_loads(); ! } ! sleep(interval); ! } ! if (pthread_attr_destroy(&attr) != 0) { ! numad_log(LOG_WARNING, "pthread_attr_destroy failure\n"); } ! pthread_mutex_destroy(&mutex); } exit(EXIT_SUCCESS);