diff --git a/numad.8 b/numad.8 index 818826e..ce4b4fe 100644 --- a/numad.8 +++ b/numad.8 @@ -8,6 +8,9 @@ management for efficient use of CPUs and memory on systems with NUMA topology. numad [\fI\-dhvV\fP] .br .LP +numad [\fI\-C 0|1\fP] +.br +.LP numad [\fI\-D non-standard-cgroup-mount-point\fP] .br .LP @@ -35,6 +38,9 @@ numad [\fI\-R reserved-CPU-list\fP] numad [\fI\-S 0|1\fP] .br .LP +numad [\fI\-t logical_CPU_percent\fP] +.br +.LP numad [\fI\-u target_utilization\fP] .br .LP @@ -59,6 +65,13 @@ accesses will likely remain unpredictable -- numad will probably not improve performance. .SH "OPTIONS" .LP +.TP +\fB\-C\fR <\fI0|1\fP> +This option controls whether or not numad treats inactive file cache as +available memory. By default, numad assumes it can count inactive file cache as +"free" memory when considering resources to match with processes. Specify +\fI\-C 0\fP if numad should instead consider inactive file cache as a consumed +resource. .TP \fB\-d\fR Debug output in log, sets the log level to LOG_DEBUG. Same effect as \fI\-l 7\fP. @@ -74,10 +87,11 @@ Display usage help information and then exit. Set the desired transparent hugepage scan interval in ms. The /sys/kernel/mm/tranparent_hugepage/khugepaged/scan_sleep_millisecs tunable is usually set to 10000ms by the operating system. The default is changed by -numad to be 1000ms, since it is helpful for the hugepage daemon to be more +numad to be 1000ms since it is helpful for the hugepage daemon to be more aggressive when memory moves between nodes. If you don't like numad's choice of 1000ms, you can make the hugepage daemon more or less aggressive by -specifying an alternate value with this option. +specifying an alternate value with this option. Setting this value to 100ms +might improve some workloads which use many transparent hugepages. .TP \fB\-i\fR <\fI[min_interval:]max_interval\fP> Sets the time interval that numad waits between system scans, in seconds to @@ -99,7 +113,9 @@ large in-memory database), you might get better results by specifying \fI\-K .TP \fB\-l\fR <\fIlog_level\fP> Sets the log level to <\fIlog_level\fP>. Reasonable choices are 5, 6, or 7. -The default value is 5. +The default value is 5. Note that CPU values are scaled by a factor of 100 +internally and in the numad log files. Unfortunately, you don't actually have +that many CPUs. .TP \fB\-p\fR <\fIPID\fP> Add PID to explicit inclusion list of processes to consider for managing, if @@ -134,10 +150,19 @@ exclusion list). Starting numad as will limit scanning, and thus also automatic NUMA management, to only those three explicitly specified processes. .TP +\fB\-t\fR <\fIlogical_CPU_percent\fP> +Determine the resource value of logical CPUs. Hardware threads typically share +most core resources, and so add only a fraction of CPU power for many +workloads. By default numad considers logical CPUs to be only 20 percent of a +dedicated core. +.TP \fB\-u\fR <\fItarget_utilization\fP> Set the desired maximum consumption percentage of a node. Default is 85%. Decrease the target value to maintain more available resource margin on each node. Increase the target value to more exhaustively consume node resources. +It is possible to specify values up to 130 percent, to oversubscribe CPUs in +the nodes, but memory utilization is capped at 100%. Use oversubscription +values carefully. .TP \fB\-v\fR Verbose output in log, sets the log level to LOG_INFO. Same effect as \fI\-l 6\fP. @@ -197,4 +222,3 @@ Bill Gray .SH "SEE ALSO" .LP numactl(8) - diff --git a/numad.c b/numad.c index 2d8ae95..7b4b0b5 100644 --- a/numad.c +++ b/numad.c @@ -19,7 +19,7 @@ Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -// Compile with: gcc -std=gnu99 -O -Wall -pthread -o numad numad.c -lrt +// Compile with: gcc -std=gnu99 -g -Wall -pthread -o numad numad.c -lrt -lm #define _GNU_SOURCE @@ -54,7 +54,7 @@ Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA #include -#define VERSION_STRING "20130814" +#define VERSION_STRING "20140225" #define VAR_RUN_FILE "/var/run/numad.pid" @@ -88,14 +88,9 @@ char *cpuset_dir_list[] = { #define MEMORY_THRESHOLD 300 #define THP_SCAN_SLEEP_MS 1000 #define TARGET_UTILIZATION_PERCENT 85 -#define IMPROVEMENT_THRESHOLD_PERCENT 5 +#define DEFAULT_HTT_PERCENT 20 -#define ELIM_NEW_LINE(s) \ - if (s[strlen(s) - 1] == '\n') { \ - s[strlen(s) - 1] = '\0'; \ - } - #define CONVERT_DIGITS_TO_NUM(p, n) \ n = *p++ - '0'; \ while (isdigit(*p)) { \ @@ -106,20 +101,36 @@ char *cpuset_dir_list[] = { int num_cpus = 0; int num_nodes = 0; +int threads_per_core = 0; int page_size_in_bytes = 0; int huge_page_size_in_bytes = 0; int thp_scan_sleep_ms = THP_SCAN_SLEEP_MS; int min_interval = MIN_INTERVAL; int max_interval = MAX_INTERVAL; +int htt_percent = DEFAULT_HTT_PERCENT; int target_utilization = TARGET_UTILIZATION_PERCENT; int scan_all_processes = 1; int keep_interleaved_memory = 0; +int use_inactive_file_cache = 1; pthread_mutex_t pid_list_mutex; pthread_mutex_t node_info_mutex; +long sum_CPUs_total = 0; int requested_mbs = 0; int requested_cpus = 0; +int got_sighup = 0; +int got_sigterm = 0; +int got_sigquit = 0; + + +void sig_handler(int signum) { + switch (signum) { + case SIGHUP: got_sighup = 1; break; + case SIGTERM: got_sigterm = 1; break; + case SIGQUIT: got_sigquit = 1; break; + } +} @@ -163,7 +174,9 @@ void open_log_file() { void close_log_file() { if (log_fs != NULL) { - fclose(log_fs); + if (log_fs != stderr) { + fclose(log_fs); + } log_fs = NULL; } } @@ -235,7 +248,6 @@ void send_msg(long dst_pid, long cmd, long arg1, long arg2, char *s) { } - typedef struct id_list { // Use CPU_SET(3) cpuset bitmasks, // but bundle size and pointer together @@ -244,16 +256,22 @@ typedef struct id_list { size_t bytes; } id_list_t, *id_list_p; -#define INIT_ID_LIST(list_p) \ +#define INIT_ID_LIST(list_p, num_elements) \ list_p = malloc(sizeof(id_list_t)); \ if (list_p == NULL) { numad_log(LOG_CRIT, "INIT_ID_LIST malloc failed\n"); exit(EXIT_FAILURE); } \ - list_p->set_p = CPU_ALLOC(num_cpus); \ + list_p->set_p = CPU_ALLOC(num_elements); \ if (list_p->set_p == NULL) { numad_log(LOG_CRIT, "CPU_ALLOC failed\n"); exit(EXIT_FAILURE); } \ - list_p->bytes = CPU_ALLOC_SIZE(num_cpus); + list_p->bytes = CPU_ALLOC_SIZE(num_elements); + +#define CLEAR_CPU_LIST(list_p) \ + if (list_p == NULL) { \ + INIT_ID_LIST(list_p, num_cpus); \ + } \ + CPU_ZERO_S(list_p->bytes, list_p->set_p) -#define CLEAR_LIST(list_p) \ +#define CLEAR_NODE_LIST(list_p) \ if (list_p == NULL) { \ - INIT_ID_LIST(list_p); \ + INIT_ID_LIST(list_p, num_nodes); \ } \ CPU_ZERO_S(list_p->bytes, list_p->set_p) @@ -264,6 +282,9 @@ typedef struct id_list { list_p = NULL; \ } +#define COPY_LIST(orig_list_p, copy_list_p) \ + memcpy(copy_list_p->set_p, orig_list_p->set_p, orig_list_p->bytes) + #define NUM_IDS_IN_LIST(list_p) CPU_COUNT_S(list_p->bytes, list_p->set_p) #define ADD_ID_TO_LIST(k, list_p) CPU_SET_S(k, list_p->bytes, list_p->set_p) #define CLR_ID_IN_LIST(k, list_p) CPU_CLR_S(k, list_p->bytes, list_p->set_p) @@ -361,6 +382,25 @@ terminate_string: return (p - str_p); } +id_list_p all_cpus_list_p = NULL; +id_list_p all_nodes_list_p = NULL; +char *reserved_cpu_str = NULL; +id_list_p reserved_cpu_mask_list_p = NULL; +uint64_t node_info_time_stamp = 0; + + +int read_one_line(char *buf, int buf_size, char *fname) { + int fd = open(fname, O_RDONLY, 0); + if (fd < 0) { + return fd; + } + int bytes = read(fd, buf, buf_size); + if (buf[bytes - 1] == '\n') { + buf[bytes - 1] = '\0'; + } + close(fd); + return bytes; +} typedef struct node_data { @@ -376,6 +416,16 @@ typedef struct node_data { node_data_p node = NULL; +uint64_t min_node_CPUs_free = MAXINT; +uint64_t min_node_MBs_free = MAXINT; +uint64_t max_node_CPUs_free = 0; +uint64_t max_node_MBs_free = 0; +uint64_t avg_node_CPUs_free = 0; +uint64_t avg_node_MBs_free = 0; +double stddev_node_CPUs_free = 0.0; +double stddev_node_MBs_free = 0.0; + + // RING_BUF_SIZE must be a power of two #define RING_BUF_SIZE 8 @@ -387,14 +437,14 @@ typedef struct process_data { uint64_t data_time_stamp; // hundredths of seconds uint64_t bind_time_stamp; uint64_t num_threads; + uint64_t MBs_size; uint64_t MBs_used; uint64_t cpu_util; uint64_t CPUs_used; // scaled * ONE_HUNDRED uint64_t CPUs_used_ring_buf[RING_BUF_SIZE]; int ring_buf_ix; - int dup_bind_count; char *comm; - char *cpuset_name; + id_list_p node_list_p; } process_data_t, *process_data_p; @@ -475,12 +525,15 @@ int process_hash_update(process_data_p newp) { } p->CPUs_used = max_CPUs_used; } +// FIXME: seems like this comm check should not be necessary every update +// But it does happen only for candidates that cross the memory threshold... if ((!p->comm) || (strcmp(p->comm, newp->comm))) { if (p->comm) { free(p->comm); } p->comm = strdup(newp->comm); } + p->MBs_size = newp->MBs_size; p->MBs_used = newp->MBs_used; p->cpu_util = newp->cpu_util; p->num_threads = newp->num_threads; @@ -489,6 +542,11 @@ int process_hash_update(process_data_p newp) { return new_hash_table_entry; } +void process_hash_clear_all_bind_time_stamps() { + for (int ix = 0; (ix < process_hash_table_size); ix++) { + process_hash_table[ix].bind_time_stamp = 0; + } +} int process_hash_rehash(int old_ix) { // Given the index of a table entry that would otherwise be orphaned by @@ -510,7 +568,7 @@ int process_hash_remove(int pid) { // remove the target process_data_p dp = &process_hash_table[ix]; if (dp->comm) { free(dp->comm); } - if (dp->cpuset_name) { free(dp->cpuset_name); } + FREE_LIST(dp->node_list_p); memset(dp, 0, sizeof(process_data_t)); // bubble up the collision chain and rehash if neeeded for (;;) { @@ -564,15 +622,29 @@ void process_hash_table_dump() { process_data_p p = &process_hash_table[ix]; if (p->pid) { numad_log(LOG_DEBUG, - "ix: %d PID: %d %s Thds: %d CPU %ld MBs: %ld Data TS: %ld Bind TS: %ld\n", + "ix: %d PID: %d %s Thds: %d CPU %ld MBs: %ld/%ld Data TS: %ld Bind TS: %ld\n", ix, p->pid, ((p->comm != NULL) ? p->comm : "(Null)"), p->num_threads, - p->CPUs_used, p->MBs_used, p->data_time_stamp, p->bind_time_stamp); + p->CPUs_used, p->MBs_used, p->MBs_size, p->data_time_stamp, p->bind_time_stamp); + // FIXME: make this dump every field + } + } +} + +void remove_obsolete_cpuset_if_no_tasks(int pid) { + // PID parameter has already been checked via kill(0) and seems dead + char buf[BUF_SIZE]; + char fname[FNAME_SIZE]; + snprintf(fname, FNAME_SIZE, "%s/numad.%d/tasks", cpuset_dir, pid); + if ((access(fname, F_OK) == 0) && (read_one_line(buf, BUF_SIZE, fname) <= 1)) { + snprintf(fname, FNAME_SIZE, "%s/numad.%d", cpuset_dir, pid); + numad_log(LOG_NOTICE, "Removing obsolete cpuset: %s\n", fname); + if (rmdir(fname) < 0) { + numad_log(LOG_ERR, "bad cpuset rmdir\n"); } } } void process_hash_table_cleanup(uint64_t update_time) { - int cpusets_removed = 0; int num_hash_entries_used = 0; for (int ix = 0; (ix < process_hash_table_size); ix++) { process_data_p p = &process_hash_table[ix]; @@ -583,40 +655,56 @@ void process_hash_table_cleanup(uint64_t update_time) { p->data_time_stamp = 0; p->CPUs_used = 0; // Check for dead pids and remove them... - char fname[FNAME_SIZE]; - snprintf(fname, FNAME_SIZE, "/proc/%d", p->pid); - if (access(fname, F_OK) < 0) { + if ((kill(p->pid, 0) == -1) && (errno == ESRCH)) { // Seems dead. Forget this pid -- after first checking // and removing obsolete numad.PID cpuset directories. - snprintf(fname, FNAME_SIZE, "%s/numad.%d", cpuset_dir, p->pid); - if (access(fname, F_OK) == 0) { - numad_log(LOG_NOTICE, "Removing obsolete cpuset: %s\n", fname); - int rc = rmdir(fname); - if (rc >= 0) { - cpusets_removed += 1; - } else { - numad_log(LOG_ERR, "bad cpuset rmdir\n"); - // exit(EXIT_FAILURE); - } - } + remove_obsolete_cpuset_if_no_tasks(p->pid); process_hash_remove(p->pid); num_hash_entries_used -= 1; } } } } - if (cpusets_removed > 0) { - // Expire all the duplicate bind counts so things will be re-evaluated sooner. - for (int ix = 0; (ix < process_hash_table_size); ix++) { - process_hash_table[ix].dup_bind_count = 0; - } - } // Keep hash table approximately half empty if ((num_hash_entries_used * 7) / 4 > process_hash_table_size) { process_hash_table_expand(); } } +static int name_starts_with_numad(const struct dirent *dptr) { + return (strncmp(dptr->d_name, "numad.", 6) == 0); +} + +void *clean_obsolete_cpusets(void *arg) { + // int arg_value = *(int *)arg; + for (;;) { + // Loop here forever (slowly) cleaning obsolete cpusets + sleep(571); // Arbitrary number a little less than ten minutes + struct dirent **namelist; + int files = scandir(cpuset_dir, &namelist, name_starts_with_numad, NULL); + if (files < 0) { + numad_log(LOG_ERR, "Troubled scanning for obsolete cpusets\n"); + continue; + } + for (int ix = 0; (ix < files); ix++) { + char *p = &(namelist[ix]->d_name[6]); + if (isdigit(*p)) { + int pid; + CONVERT_DIGITS_TO_NUM(p, pid); + // If it seems like a valid PID -- that is NOT in the hash + // table -- and the process appears to be dead, then try to + // delete the cpuset directory. (Dead PIDs we know about in + // the hash table will be cleaned separately.) + if ((pid > 10) && (process_hash_lookup(pid) < 0) + && (kill(pid, 0) == -1) && (errno == ESRCH)) { + remove_obsolete_cpuset_if_no_tasks(pid); + } + } + free(namelist[ix]); + } + free(namelist); + } +} typedef struct pid_list { @@ -631,9 +719,7 @@ pid_list_p insert_pid_into_pid_list(pid_list_p list_ptr, long pid) { if (process_hash_table != NULL) { int hash_ix = process_hash_lookup(pid); if ((hash_ix >= 0) && (list_ptr == include_pid_list)) { - // Clear dup_bind_count and interleaved flag, - // in case user wants it to be re-evaluated soon - process_hash_table[hash_ix].dup_bind_count = 0; + // Clear interleaved flag, in case user wants it to be re-evaluated process_hash_table[hash_ix].flags &= ~PROCESS_FLAG_INTERLEAVED; } } @@ -699,19 +785,22 @@ void print_version_and_exit(char *prog_name) { void print_usage_and_exit(char *prog_name) { fprintf(stderr, "Usage: %s ...\n", prog_name); + fprintf(stderr, "-C 1 to count inactive file cache as available memory (default 1)\n"); + fprintf(stderr, "-C 0 to count inactive file cache memory as unavailable (default 1)\n"); fprintf(stderr, "-d for debug logging (same effect as '-l 7')\n"); fprintf(stderr, "-D to specify cgroup mount point\n"); fprintf(stderr, "-h to print this usage info\n"); fprintf(stderr, "-H to set THP scan_sleep_ms (default 1000)\n"); fprintf(stderr, "-i [:] to specify interval seconds\n"); - fprintf(stderr, "-K 1 to keep interleaved memory spread across nodes\n"); - fprintf(stderr, "-K 0 to merge interleaved memory to local NUMA nodes\n"); - fprintf(stderr, "-l to specify logging level (usually 5, 6, or 7)\n"); + fprintf(stderr, "-K 1 to keep interleaved memory spread across nodes (default 0)\n"); + fprintf(stderr, "-K 0 to merge interleaved memory to local NUMA nodes (default 0)\n"); + fprintf(stderr, "-l to specify logging level (usually 5, 6, or 7 -- default 5)\n"); fprintf(stderr, "-p to add PID to inclusion pid list\n"); fprintf(stderr, "-r to remove PID from explicit pid lists\n"); fprintf(stderr, "-R to reserve some CPUs for non-numad use\n"); - fprintf(stderr, "-S 1 to scan all processes\n"); - fprintf(stderr, "-S 0 to scan only explicit PID list processes\n"); + fprintf(stderr, "-S 1 to scan all processes (default 1)\n"); + fprintf(stderr, "-S 0 to scan only explicit PID list processes (default 1)\n"); + fprintf(stderr, "-t to specify thread / logical CPU percent (default 20)\n"); fprintf(stderr, "-u to specify target utilization percent (default 85)\n"); fprintf(stderr, "-v for verbose (same effect as '-l 6')\n"); fprintf(stderr, "-V to show version info\n"); @@ -722,6 +811,10 @@ void print_usage_and_exit(char *prog_name) { void set_thp_scan_sleep_ms(int new_ms) { + if (new_ms < 1) { + // 0 means do not change the system default + return; + } char *thp_scan_fname = "/sys/kernel/mm/transparent_hugepage/khugepaged/scan_sleep_millisecs"; int fd = open(thp_scan_fname, O_RDWR, 0); if (fd >= 0) { @@ -854,6 +947,43 @@ fail_numad_run_file: } +int count_set_bits_in_hex_list_file(char *fname) { + int sum = 0; + int fd = open(fname, O_RDONLY, 0); + if (fd >= 0) { + char buf[BUF_SIZE]; + int bytes = read(fd, buf, BUF_SIZE); + close(fd); + for (int ix = 0; (ix < bytes); ix++) { + char c = tolower(buf[ix]); + switch (c) { + case '0' : sum += 0; break; + case '1' : sum += 1; break; + case '2' : sum += 1; break; + case '3' : sum += 2; break; + case '4' : sum += 1; break; + case '5' : sum += 2; break; + case '6' : sum += 2; break; + case '7' : sum += 3; break; + case '8' : sum += 1; break; + case '9' : sum += 2; break; + case 'a' : sum += 2; break; + case 'b' : sum += 3; break; + case 'c' : sum += 2; break; + case 'd' : sum += 3; break; + case 'e' : sum += 3; break; + case 'f' : sum += 4; break; + case ' ' : sum += 0; break; + case ',' : sum += 0; break; + case '\n' : sum += 0; break; + default : numad_log(LOG_CRIT, "Unexpected character in list\n"); exit(EXIT_FAILURE); + } + } + } + return sum; +} + + int get_num_cpus() { int n1 = sysconf(_SC_NPROCESSORS_CONF); int n2 = sysconf(_SC_NPROCESSORS_ONLN); @@ -939,129 +1069,244 @@ static int name_starts_with_digit(const struct dirent *dptr) { } -int bind_process_and_migrate_memory(int pid, char *cpuset_name, id_list_p node_list_p, id_list_p cpu_list_p) { - // Check basic parameter validity. - if (pid <= 0) { +int write_to_cpuset_file(char *fname, char *s) { + int fd = open(fname, O_WRONLY | O_TRUNC, 0); + if (fd == -1) { + numad_log(LOG_CRIT, "Could not open %s -- errno: %d\n", fname, errno); + return -1; + } + numad_log(LOG_DEBUG, "Writing %s to: %s\n", s, fname); + if (write(fd, s, strlen(s)) <= 0) { + numad_log(LOG_CRIT, "Could not write %s to %s -- errno: %d\n", s, fname, errno); + return -1; + } + close(fd); + return 0; +} + +int configure_cpuset(char *cpuset_name, char *node_list_str, char *cpu_list_str) { + int rc = 0; + char fname[FNAME_SIZE]; + // Write "1" out to cpuset.memory_migrate file + snprintf(fname, FNAME_SIZE, "%s/cpuset.memory_migrate", cpuset_name); + rc += write_to_cpuset_file(fname, "1"); + // For memory binding, write node IDs out to cpuset.mems file + snprintf(fname, FNAME_SIZE, "%s/cpuset.mems", cpuset_name); + rc += write_to_cpuset_file(fname, node_list_str); + // For CPU binding, write CPU IDs out to cpuset.cpus file + snprintf(fname, FNAME_SIZE, "%s/cpuset.cpus", cpuset_name); + rc += write_to_cpuset_file(fname, cpu_list_str); + return rc; +} + +int bind_process_and_migrate_memory(process_data_p p) { + char buf[BUF_SIZE]; + char fname[FNAME_SIZE]; + char pid_cpuset_name[FNAME_SIZE]; + uint64_t t0 = get_time_stamp(); + // Parameter p is a pointer to an element in the hash table + if ((!p) || (p->pid < 1)) { numad_log(LOG_CRIT, "Bad PID to bind\n"); exit(EXIT_FAILURE); } - if ((cpuset_name == NULL) || (strlen(cpuset_name) == 0)) { - numad_log(LOG_CRIT, "Bad cpuset name to bind\n"); + if (!p->node_list_p) { + numad_log(LOG_CRIT, "Cannot bind to unspecified node(s)\n"); exit(EXIT_FAILURE); } - int nodes; - if ((node_list_p == NULL) || ((nodes = NUM_IDS_IN_LIST(node_list_p)) == 0)) { - numad_log(LOG_CRIT, "Cannot bind to unspecified node\n"); - exit(EXIT_FAILURE); + // Get cpuset name for this PID, or make a new cpuset if necessary + snprintf(fname, FNAME_SIZE, "/proc/%d/cpuset", p->pid); + if (read_one_line(buf, BUF_SIZE, fname) <= 0) { + numad_log(LOG_WARNING, "Could not get cpuset of PID %d.\n", p->pid); + return 0; // Assume the process terminated } - // Cpu_list_p is optional and may be NULL... - // Generate CPU id list from the specified node list if necessary - if (cpu_list_p == NULL) { - static id_list_p tmp_cpu_list_p; - CLEAR_LIST(tmp_cpu_list_p); - int node_id = 0; - while (nodes) { - if (ID_IS_IN_LIST(node_id, node_list_p)) { - OR_LISTS(tmp_cpu_list_p, tmp_cpu_list_p, node[node_id].cpu_list_p); - nodes -= 1; - } - node_id += 1; - } - cpu_list_p = tmp_cpu_list_p; - } - // Make the cpuset directory if necessary - char cpuset_name_buf[FNAME_SIZE]; - snprintf(cpuset_name_buf, FNAME_SIZE, "%s%s", cpuset_dir, cpuset_name); - char *p = &cpuset_name_buf[strlen(cpuset_dir)]; - if (!strcmp(p, "/")) { - // Make a cpuset directory for this process - snprintf(cpuset_name_buf, FNAME_SIZE, "%s/numad.%d", cpuset_dir, pid); - numad_log(LOG_NOTICE, "Making new cpuset: %s\n", cpuset_name_buf); - int rc = mkdir(cpuset_name_buf, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - if (rc == -1) { + if (!strcmp(buf, "/")) { + // Default cpuset name, so make a new cpuset directory for this PID + snprintf(pid_cpuset_name, FNAME_SIZE, "%s/numad.%d", cpuset_dir, p->pid); + numad_log(LOG_NOTICE, "Making new cpuset: %s\n", pid_cpuset_name); + if (mkdir(pid_cpuset_name, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) < 0) { numad_log(LOG_CRIT, "Bad cpuset mkdir -- errno: %d\n", errno); return 0; } + // Temporarily enable all CPUs for a new cpuset... + char all_cpus_list_buf[BUF_SIZE]; + str_from_id_list(all_cpus_list_buf, BUF_SIZE, all_cpus_list_p); + // Write CPU IDs out to cpuset.cpus file for CPU binding of main PID + snprintf(fname, FNAME_SIZE, "%s/cpuset.cpus", pid_cpuset_name); + if (write_to_cpuset_file(fname, all_cpus_list_buf) < 0) { + numad_log(LOG_CRIT, "Could not configure cpuset.cpus: %s\n", pid_cpuset_name); + return 0; // Assume the process terminated + } + } else { + // Save the existing nondefault cpuset name for this PID + snprintf(pid_cpuset_name, FNAME_SIZE, "%s%s", cpuset_dir, buf); } - cpuset_name = cpuset_name_buf; - // Now that we have a cpuset for pid and a populated cpulist, - // start the actual binding and migration. - uint64_t t0 = get_time_stamp(); - + // Configure the main PID cpuset with desired nodes and memory migrate + // flag. Defer the CPU binding for the main PID until after the PID is + // actually written to the task file and the memory has been moved. + char node_list_buf[BUF_SIZE]; + str_from_id_list(node_list_buf, BUF_SIZE, p->node_list_p); // Write "1" out to cpuset.memory_migrate file - char fname[FNAME_SIZE]; - snprintf(fname, FNAME_SIZE, "%s/cpuset.memory_migrate", cpuset_name); + snprintf(fname, FNAME_SIZE, "%s/cpuset.memory_migrate", pid_cpuset_name); + if (write_to_cpuset_file(fname, "1") < 0) { + numad_log(LOG_CRIT, "Could not configure cpuset: %s\n", pid_cpuset_name); + return 0; // Assume the process terminated + } + // For memory binding, write node IDs out to cpuset.mems file + snprintf(fname, FNAME_SIZE, "%s/cpuset.mems", pid_cpuset_name); + if (write_to_cpuset_file(fname, node_list_buf) < 0) { + numad_log(LOG_CRIT, "Could not configure cpuset: %s\n", pid_cpuset_name); + return 0; // Assume the process terminated + } + // Open the main PID cpuset tasks file and + // bind the main PID in the main cpuset now. + snprintf(fname, FNAME_SIZE, "%s/tasks", pid_cpuset_name); int fd = open(fname, O_WRONLY | O_TRUNC, 0); - if (fd == -1) { - numad_log(LOG_CRIT, "Could not open cpuset.memory_migrate -- errno: %d\n", errno); - return 0; + if (fd < 0) { + numad_log(LOG_CRIT, "Could not open %s -- errno: %d\n", fname, errno); + return 0; // Assume the process terminated } - write(fd, "1", 1); - close(fd); - - // Write node IDs out to cpuset.mems file - char node_list_buf[BUF_SIZE]; - snprintf(fname, FNAME_SIZE, "%s/cpuset.mems", cpuset_name); - fd = open(fname, O_WRONLY | O_TRUNC, 0); - if (fd == -1) { - numad_log(LOG_CRIT, "Could not open cpuset.mems -- errno: %d\n", errno); - return 0; + numad_log(LOG_NOTICE, "Including PID: %d in cpuset: %s\n", p->pid, pid_cpuset_name); + char pid_str[FNAME_SIZE]; + snprintf(pid_str, FNAME_SIZE, "%d", p->pid); + if (write(fd, pid_str, strlen(pid_str)) <= 0) { + numad_log(LOG_CRIT, "Could not write %s to cpuset: %s -- errno: %d\n", pid_str, pid_cpuset_name, errno); + close(fd); + return 0; // Assume the process terminated } - int len = str_from_id_list(node_list_buf, BUF_SIZE, node_list_p); - write(fd, node_list_buf, len); - close(fd); - - // Write CPU IDs out to cpuset.cpus file - char cpu_list_buf[BUF_SIZE]; - snprintf(fname, FNAME_SIZE, "%s/cpuset.cpus", cpuset_name); - fd = open(fname, O_WRONLY | O_TRUNC, 0); - if (fd == -1) { - numad_log(LOG_CRIT, "Could not open cpuset.cpus -- errno: %d\n", errno); - return 0; + // Generate CPU binding list derived from node bind list. + static id_list_p cpu_bind_list_p; + CLEAR_CPU_LIST(cpu_bind_list_p); + int nodes = NUM_IDS_IN_LIST(p->node_list_p); + int node_id = 0; + while (nodes) { + if (ID_IS_IN_LIST(node_id, p->node_list_p)) { + OR_LISTS(cpu_bind_list_p, cpu_bind_list_p, node[node_id].cpu_list_p); + nodes -= 1; + } + node_id += 1; } - len = str_from_id_list(cpu_list_buf, BUF_SIZE, cpu_list_p); - write(fd, cpu_list_buf, len); - close(fd); - - // Copy pid tasks one at a time to tasks file - snprintf(fname, FNAME_SIZE, "%s/tasks", cpuset_name); - fd = open(fname, O_WRONLY | O_TRUNC, 0); - if (fd == -1) { - numad_log(LOG_CRIT, "Could not open tasks -- errno: %d\n", errno); - return 0; + char cpu_bind_list_buf[BUF_SIZE]; + str_from_id_list(cpu_bind_list_buf, BUF_SIZE, cpu_bind_list_p); + // Write CPU IDs out to cpuset.cpus file for CPU binding of main PID + snprintf(fname, FNAME_SIZE, "%s/cpuset.cpus", pid_cpuset_name); + if (write_to_cpuset_file(fname, cpu_bind_list_buf) < 0) { + numad_log(LOG_CRIT, "Could not configure cpuset: %s\n", pid_cpuset_name); + return 0; // Assume the process terminated } - snprintf(fname, FNAME_SIZE, "/proc/%d/task", pid); + // Leave fd open in case process is multithreaded and we need to write more + // (sub) task IDs there. In case multithreaded, make sure all the subtasks + // for this PID are in a cpuset. If not already in cpuset, put them in the + // main cpuset. Start by getting the name list of all tasks for this PID. struct dirent **namelist; - int files = scandir(fname, &namelist, name_starts_with_digit, NULL); - if (files < 0) { - numad_log(LOG_WARNING, "Could not scandir task list\n"); + snprintf(fname, FNAME_SIZE, "/proc/%d/task", p->pid); + int num_tasks = scandir(fname, &namelist, name_starts_with_digit, NULL); + if (num_tasks <= 0) { + numad_log(LOG_WARNING, "Could not scandir task list for PID: %d\n", p->pid); + close(fd); return 0; // Assume the process terminated } - for (int ix = 0; (ix < files); ix++) { - // copy pid tasks, one at a time - numad_log(LOG_NOTICE, "Including task: %s\n", namelist[ix]->d_name); - write(fd, namelist[ix]->d_name, strlen(namelist[ix]->d_name)); - free(namelist[ix]); + if (num_tasks == 1) { + // This is the normal nonthreaded case. No sub tasks -- only the + // single main PID task, which is already bound above... + free(namelist[0]); + } else { + // Multithreaded so check all of the multiple subtasks. Avoid redundant + // subtask cpuset configuration by keeping a list of unique cpusets as + // we check each subtask. If the subtasks have only default cpuset + // names, bind those subtasks into the main cpuset with the main PID + // instead of adding them to the list. (cpuset_list is static so we + // can reuse the allocated array of pointers.) + int num_names = 0; + static char **cpuset_list; + static int cpuset_list_size; + for (int ix = 0; (ix < num_tasks); ix++) { + // Check the cpuset name for each task + if (!strcmp(namelist[ix]->d_name, pid_str)) { + // This is the main PID task, which is already bound above. Skip it here. + free(namelist[ix]); + continue; + } + snprintf(fname, FNAME_SIZE, "/proc/%d/task/%s/cpuset", p->pid, namelist[ix]->d_name); + if (read_one_line(buf, BUF_SIZE, fname) <= 0) { + numad_log(LOG_WARNING, "Could not open %s. Assuming thread completed.\n", fname); + free(namelist[ix]); + continue; + } + if (strcmp(buf, "/")) { + // Subtask already has a nondefault cpuset name. Add this + // subtask cpuset name to the list of unique cpuset names. Do + // sequential search comparisons first to verify uniqueness. + snprintf(fname, FNAME_SIZE, "%s%s", cpuset_dir, buf); + int iy = 0; + while (iy < num_names) { + if (!strcmp(fname, cpuset_list[iy])) { + break; // because we already have this cpuset name in the list + } + iy += 1; + } + if (iy == num_names) { + // We got to the end of the cpulist, so this is a new cpuset name not yet in the list + if (num_names == cpuset_list_size) { + if (cpuset_list_size == 0) { + cpuset_list_size = 10; + } else { + cpuset_list_size *= 2; + } + cpuset_list = realloc(cpuset_list, (cpuset_list_size * sizeof(char *))); + if (cpuset_list == NULL) { + numad_log(LOG_CRIT, "realloc failed\n"); + exit(EXIT_FAILURE); + } + } + // Configure this subtask cpuset and, if successful, save a + // copy of the name in the unique cpuset list. + if (configure_cpuset(fname, node_list_buf, cpu_bind_list_buf) < 0) { + numad_log(LOG_WARNING, "Could not configure cpuset %s. Assuming thread completed.\n", fname); + free(namelist[ix]); + continue; + } else { + cpuset_list[num_names++] = strdup(fname); + } + } + } else { + // This task ID has the default cpuset name. Just add this task ID to the main PID cpuset. + numad_log(LOG_NOTICE, "Including task: %s in cpuset: %s\n", namelist[ix]->d_name, pid_cpuset_name); + if (write(fd, namelist[ix]->d_name, strlen(namelist[ix]->d_name)) <= 0) { + numad_log(LOG_WARNING, "Could not write to cpuset: %s -- errno: %d\n", pid_cpuset_name, errno); + free(namelist[ix]); + continue; // Assuming thread completed. + } + } + free(namelist[ix]); + } + // Done with subtask unique cpuset names for this PID. Free them. + for (int ix = 0; (ix < num_names); ix++) { + free(cpuset_list[ix]); + } } free(namelist); close(fd); - - uint64_t t1 = get_time_stamp(); // Check pid still active - snprintf(fname, FNAME_SIZE, "/proc/%d", pid); + snprintf(fname, FNAME_SIZE, "/proc/%d", p->pid); if (access(fname, F_OK) < 0) { - numad_log(LOG_WARNING, "Could not migrate pid\n"); - return 0; // Assume the process terminated + numad_log(LOG_WARNING, "Could not migrate pid %d\n", p->pid); + return 0; + } else { + uint64_t t1 = get_time_stamp(); + p->bind_time_stamp = t1; + numad_log(LOG_NOTICE, "PID %d moved to node(s) %s in %d.%d seconds\n", p->pid, node_list_buf, (t1-t0)/100, (t1-t0)%100); + return 1; } - numad_log(LOG_NOTICE, "PID %d moved to node(s) %s in %d.%d seconds\n", pid, node_list_buf, (t1-t0)/100, (t1-t0)%100); - return 1; } void show_nodes() { - time_t ts = time(NULL); - fprintf(log_fs, "%s", ctime(&ts)); - fprintf(log_fs, "Nodes: %d\n", num_nodes); + fprintf(log_fs, "\n"); + numad_log(LOG_INFO, "Nodes: %d\n", num_nodes); + fprintf(log_fs, "Min CPUs free: %ld, Max CPUs: %ld, Avg CPUs: %ld, StdDev: %lg\n", + min_node_CPUs_free, max_node_CPUs_free, avg_node_CPUs_free, stddev_node_CPUs_free); + fprintf(log_fs, "Min MBs free: %ld, Max MBs: %ld, Avg MBs: %ld, StdDev: %lg\n", + min_node_MBs_free, max_node_MBs_free, avg_node_MBs_free, stddev_node_MBs_free); for (int ix = 0; (ix < num_nodes); ix++) { fprintf(log_fs, "Node %d: MBs_total %ld, MBs_free %6ld, CPUs_total %ld, CPUs_free %4ld, Distance: ", ix, node[ix].MBs_total, node[ix].MBs_free, node[ix].CPUs_total, node[ix].CPUs_free); @@ -1072,7 +1317,6 @@ void show_nodes() { str_from_id_list(buf, BUF_SIZE, node[ix].cpu_list_p); fprintf(log_fs, " CPUs: %s\n", buf); } - fprintf(log_fs, "\n"); fflush(log_fs); } @@ -1088,7 +1332,7 @@ int cur_cpu_data_buf = 0; void update_cpu_data() { // Parse idle percents from CPU stats in /proc/stat cpu lines - static FILE *fs = NULL; + static FILE *fs; if (fs != NULL) { rewind(fs); } else { @@ -1130,7 +1374,8 @@ void update_cpu_data() { while (!isdigit(*p)) { p++; } while (isdigit(*p)) { p++; } // skip nice while (!isdigit(*p)) { p++; } while (isdigit(*p)) { p++; } // skip system while (!isdigit(*p)) { p++; } - uint64_t idle = *p++ - '0'; while (isdigit(*p)) { idle *= 10; idle += (*p++ - '0'); } + uint64_t idle; + CONVERT_DIGITS_TO_NUM(p, idle); cpu_data_buf[new].idle[cpu_id] = idle; } } @@ -1152,12 +1397,6 @@ int node_and_digits(const struct dirent *dptr) { } -id_list_p all_cpus_list_p = NULL; -id_list_p all_nodes_list_p = NULL; -char *reserved_cpu_str = NULL; -id_list_p reserved_cpu_mask_list_p = NULL; -uint64_t node_info_time_stamp = 0; - int update_nodes() { char fname[FNAME_SIZE]; @@ -1166,6 +1405,7 @@ int update_nodes() { uint64_t time_stamp = get_time_stamp(); #define STATIC_NODE_INFO_DELAY (600 * ONE_HUNDRED) if ((num_nodes == 0) || (node_info_time_stamp + STATIC_NODE_INFO_DELAY < time_stamp)) { + node_info_time_stamp = time_stamp; // Count directory names of the form: /sys/devices/system/node/node struct dirent **namelist; int num_files = scandir ("/sys/devices/system/node", &namelist, node_and_digits, NULL); @@ -1192,8 +1432,15 @@ int update_nodes() { } num_nodes = num_files; } - CLEAR_LIST(all_cpus_list_p); - CLEAR_LIST(all_nodes_list_p); + sum_CPUs_total = 0; + CLEAR_CPU_LIST(all_cpus_list_p); + CLEAR_NODE_LIST(all_nodes_list_p); + // Figure out how many threads per core there are (for later discounting of hyper-threads) + threads_per_core = count_set_bits_in_hex_list_file("/sys/devices/system/cpu/cpu0/topology/thread_siblings"); + if (threads_per_core < 1) { + numad_log(LOG_CRIT, "Could not count threads per core\n"); + exit(EXIT_FAILURE); + } // For each "node" filename present, save in node[ix].node_id // Note that the node id might not necessarily match the node ix. // Also populate the cpu lists and distance vectors for this node. @@ -1210,14 +1457,22 @@ int update_nodes() { int fd = open(fname, O_RDONLY, 0); if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) { // get cpulist from the cpulist string - CLEAR_LIST(node[node_ix].cpu_list_p); + CLEAR_CPU_LIST(node[node_ix].cpu_list_p); int n = add_ids_to_list_from_str(node[node_ix].cpu_list_p, buf); if (reserved_cpu_str != NULL) { AND_LISTS(node[node_ix].cpu_list_p, node[node_ix].cpu_list_p, reserved_cpu_mask_list_p); n = NUM_IDS_IN_LIST(node[node_ix].cpu_list_p); } OR_LISTS(all_cpus_list_p, all_cpus_list_p, node[node_ix].cpu_list_p); - node[node_ix].CPUs_total = n * ONE_HUNDRED; + // Calculate total CPUs, but possibly discount hyper-threads + if ((threads_per_core == 1) || (htt_percent >= 100)) { + node[node_ix].CPUs_total = n * ONE_HUNDRED; + } else { + n /= threads_per_core; + node[node_ix].CPUs_total = n * ONE_HUNDRED; + node[node_ix].CPUs_total += n * (threads_per_core - 1) * htt_percent; + } + sum_CPUs_total += node[node_ix].CPUs_total; close(fd); } else { numad_log(LOG_CRIT, "Could not get node cpu list\n"); @@ -1258,12 +1513,19 @@ int update_nodes() { time_stamp = get_time_stamp(); } update_cpu_data(); + max_node_MBs_free = 0; + max_node_CPUs_free = 0; + min_node_MBs_free = MAXINT; + min_node_CPUs_free = MAXINT; + uint64_t sum_of_node_MBs_free = 0; + uint64_t sum_of_node_CPUs_free = 0; for (int node_ix = 0; (node_ix < num_nodes); node_ix++) { int node_id = node[node_ix].node_id; // Get available memory info from node/meminfo file snprintf(fname, FNAME_SIZE, "/sys/devices/system/node/node%d/meminfo", node_id); int fd = open(fname, O_RDONLY, 0); if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) { + close(fd); uint64_t KB; char *p = strstr(buf, "MemTotal:"); if (p != NULL) { @@ -1274,7 +1536,7 @@ int update_nodes() { } while (!isdigit(*p)) { p++; } CONVERT_DIGITS_TO_NUM(p, KB); - node[node_ix].MBs_total = KB / KILOBYTE; + node[node_ix].MBs_total = (KB / KILOBYTE); p = strstr(p, "MemFree:"); if (p != NULL) { p += 8; @@ -1284,8 +1546,27 @@ int update_nodes() { } while (!isdigit(*p)) { p++; } CONVERT_DIGITS_TO_NUM(p, KB); - node[node_ix].MBs_free = KB / KILOBYTE; - close(fd); + node[node_ix].MBs_free = (KB / KILOBYTE); + if (use_inactive_file_cache) { + // Add inactive file cache quantity to "free" memory + p = strstr(p, "Inactive(file):"); + if (p != NULL) { + p += 15; + } else { + numad_log(LOG_CRIT, "Could not get node Inactive(file)\n"); + exit(EXIT_FAILURE); + } + while (!isdigit(*p)) { p++; } + CONVERT_DIGITS_TO_NUM(p, KB); + node[node_ix].MBs_free += (KB / KILOBYTE); + } + sum_of_node_MBs_free += node[node_ix].MBs_free; + if (min_node_MBs_free > node[node_ix].MBs_free) { + min_node_MBs_free = node[node_ix].MBs_free; + } + if (max_node_MBs_free < node[node_ix].MBs_free) { + max_node_MBs_free = node[node_ix].MBs_free; + } } else { numad_log(LOG_CRIT, "Could not get node meminfo\n"); exit(EXIT_FAILURE); @@ -1296,7 +1577,8 @@ int update_nodes() { if (cpu_data_buf[old_cpu_data_buf].time_stamp > 0) { uint64_t idle_ticks = 0; int cpu = 0; - int num_cpus_to_process = node[node_ix].CPUs_total / ONE_HUNDRED; + int num_lcpus = NUM_IDS_IN_LIST(node[node_ix].cpu_list_p); + int num_cpus_to_process = num_lcpus; while (num_cpus_to_process) { if (ID_IS_IN_LIST(cpu, node[node_ix].cpu_list_p)) { idle_ticks += cpu_data_buf[cur_cpu_data_buf].idle[cpu] @@ -1310,15 +1592,45 @@ int update_nodes() { // printf("Node: %d CPUs: %ld time diff %ld Idle ticks %ld\n", node_id, node[node_ix].CPUs_total, time_diff, idle_ticks); // assert(time_diff > 0); node[node_ix].CPUs_free = (idle_ticks * ONE_HUNDRED) / time_diff; + // Possibly discount hyper-threads + if ((threads_per_core > 1) && (htt_percent < 100)) { + uint64_t htt_discount = (num_lcpus - (num_lcpus / threads_per_core)) * (100 - htt_percent); + if (node[node_ix].CPUs_free > htt_discount) { + node[node_ix].CPUs_free -= htt_discount; + } else { + node[node_ix].CPUs_free = 0; + } + } if (node[node_ix].CPUs_free > node[node_ix].CPUs_total) { node[node_ix].CPUs_free = node[node_ix].CPUs_total; } + sum_of_node_CPUs_free += node[node_ix].CPUs_free; + if (min_node_CPUs_free > node[node_ix].CPUs_free) { + min_node_CPUs_free = node[node_ix].CPUs_free; + } + if (max_node_CPUs_free < node[node_ix].CPUs_free) { + max_node_CPUs_free = node[node_ix].CPUs_free; + } node[node_ix].magnitude = node[node_ix].CPUs_free * node[node_ix].MBs_free; } else { node[node_ix].CPUs_free = 0; node[node_ix].magnitude = 0; } } + avg_node_MBs_free = sum_of_node_MBs_free / num_nodes; + avg_node_CPUs_free = sum_of_node_CPUs_free / num_nodes; + double MBs_variance_sum = 0.0; + double CPUs_variance_sum = 0.0; + for (int node_ix = 0; (node_ix < num_nodes); node_ix++) { + double MBs_diff = (double)node[node_ix].MBs_free - (double)avg_node_MBs_free; + double CPUs_diff = (double)node[node_ix].CPUs_free - (double)avg_node_CPUs_free; + MBs_variance_sum += MBs_diff * MBs_diff; + CPUs_variance_sum += CPUs_diff * CPUs_diff; + } + double MBs_variance = MBs_variance_sum / (num_nodes); + double CPUs_variance = CPUs_variance_sum / (num_nodes); + stddev_node_MBs_free = sqrt(MBs_variance); + stddev_node_CPUs_free = sqrt(CPUs_variance); if (log_level >= LOG_INFO) { show_nodes(); } @@ -1352,7 +1664,7 @@ typedef struct stat_data { int64_t num_threads; // 19 int64_t itrealvalue; uint64_t starttime; - uint64_t vsize; + uint64_t vsize; // 22 int64_t rss; // 23 uint64_t rsslim; uint64_t startcode; @@ -1397,10 +1709,11 @@ process_data_p get_stat_data_for_pid(int pid, char *pid_string) { return NULL; } close(fd); + uint64_t val; char *p = buf; static process_data_t data; // Get PID from field 0 - uint64_t val = *p++ - '0'; while (isdigit(*p)) { val *= 10; val += (*p++ - '0'); } + CONVERT_DIGITS_TO_NUM(p, val); data.pid = val; // Copy comm from field 1 while (*p == ' ') { p++; } @@ -1409,23 +1722,27 @@ process_data_p get_stat_data_for_pid(int pid, char *pid_string) { // Skip fields 2 through 12 for (int ix = 0; (ix < 11); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } // Get utime from field 13 for cpu_util - val = *p++ - '0'; while (isdigit(*p)) { val *= 10; val += (*p++ - '0'); } + CONVERT_DIGITS_TO_NUM(p, val); data.cpu_util = val; // Get stime from field 14 to add on to cpu_util (which already has utime) while (*p == ' ') { p++; } - val = *p++ - '0'; while (isdigit(*p)) { val *= 10; val += (*p++ - '0'); } + CONVERT_DIGITS_TO_NUM(p, val); data.cpu_util += val; // Skip fields 15 through 18 while (*p == ' ') { p++; } for (int ix = 0; (ix < 4); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } // Get num_threads from field 19 - val = *p++ - '0'; while (isdigit(*p)) { val *= 10; val += (*p++ - '0'); } + CONVERT_DIGITS_TO_NUM(p, val); data.num_threads = val; - // Skip fields 20 through 22 + // Skip fields 20 through 21 while (*p == ' ') { p++; } - for (int ix = 0; (ix < 3); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } + for (int ix = 0; (ix < 2); ix++) { while (*p != ' ') { p++; } while (*p == ' ') { p++; } } + // Get vsize from field 22 to compute MBs_size + CONVERT_DIGITS_TO_NUM(p, val); + data.MBs_size = val / MEGABYTE; // Get rss from field 23 to compute MBs_used - val = *p++ - '0'; while (isdigit(*p)) { val *= 10; val += (*p++ - '0'); } + while (*p == ' ') { p++; } + CONVERT_DIGITS_TO_NUM(p, val); data.MBs_used = (val * page_size_in_bytes) / MEGABYTE; // Return pointer to data return &data; @@ -1507,20 +1824,79 @@ int update_processes() { } +int initialize_mem_node_list(process_data_p p) { + // Parameter p is a pointer to an element in the hash table + if ((!p) || (p->pid < 1)) { + numad_log(LOG_CRIT, "Cannot initialize mem node lists with bad PID\n"); + exit(EXIT_FAILURE); + } + int n = 0; + char fname[FNAME_SIZE]; + char buf[BIG_BUF_SIZE]; + CLEAR_NODE_LIST(p->node_list_p); + snprintf(fname, FNAME_SIZE, "/proc/%d/status", p->pid); + int fd = open(fname, O_RDONLY, 0); + if ((fd >= 0) && (read(fd, buf, BIG_BUF_SIZE) > 0)) { + close(fd); + char *list_str_p = strstr(buf, "Mems_allowed_list:"); + if (!list_str_p) { + numad_log(LOG_CRIT, "Could not get node Mems_allowed_list\n"); + exit(EXIT_FAILURE); + } + list_str_p += 18; + while (!isdigit(*list_str_p)) { list_str_p++; } + n = add_ids_to_list_from_str(p->node_list_p, list_str_p); + } else { + numad_log(LOG_WARNING, "Tried to research PID %d, but it apparently went away.\n", p->pid); + return 0; // Assume the process terminated + } + if (n < num_nodes) { + // If process already bound to a subset of nodes when we discover it, + // set initial bind_time_stamp to 30 minutes ago... + p->bind_time_stamp = get_time_stamp() - (1800 * ONE_HUNDRED); + } + return n; +} + + + + +uint64_t combined_value_of_weighted_resources(int ix, int mbs, int cpus, uint64_t MBs_free, uint64_t CPUs_free) { + int64_t needed_mem; + int64_t needed_cpu; + int64_t excess_mem; + int64_t excess_cpu; + if (MBs_free > mbs) { + needed_mem = mbs; + excess_mem = MBs_free - mbs; + } else { + needed_mem = MBs_free; + excess_mem = 0; + } + if (CPUs_free > cpus) { + needed_cpu = cpus; + excess_cpu = CPUs_free - cpus; + } else { + needed_cpu = CPUs_free; + excess_cpu = 0; + } + // Weight the available resources, and then calculate magnitude as + // product of available CPUs and available MBs. + int64_t memfactor = (needed_mem * 10 + excess_mem * 3); + int64_t cpufactor = (needed_cpu * 8 + excess_cpu * 1); + numad_log(LOG_DEBUG, " Node[%d]: mem: %ld cpu: %ld\n", ix, memfactor, cpufactor); + return (memfactor * cpufactor); +} + id_list_p pick_numa_nodes(int pid, int cpus, int mbs, int assume_enough_cpus) { - char buf[BUF_SIZE]; - char buf2[BUF_SIZE]; if (log_level >= LOG_DEBUG) { numad_log(LOG_DEBUG, "PICK NODES FOR: PID: %d, CPUs %d, MBs %d\n", pid, cpus, mbs); } - int num_existing_mems = 0; - static id_list_p existing_mems_list_p; - CLEAR_LIST(existing_mems_list_p); - uint64_t time_stamp = get_time_stamp(); + char buf[BUF_SIZE]; + uint64_t process_CPUs = 0; static node_data_p tmp_node; static uint64_t *process_MBs; - static uint64_t *saved_magnitude_for_node; static int process_MBs_num_nodes; // See if dynamic structures need to grow. if (process_MBs_num_nodes < num_nodes + 1) { @@ -1528,121 +1904,25 @@ id_list_p pick_numa_nodes(int pid, int cpus, int mbs, int assume_enough_cpus) { // The "+1 node" is for accumulating interleaved memory process_MBs = realloc(process_MBs, process_MBs_num_nodes * sizeof(uint64_t)); tmp_node = realloc(tmp_node, num_nodes * sizeof(node_data_t) ); - saved_magnitude_for_node = realloc(saved_magnitude_for_node, num_nodes * sizeof(uint64_t)); - if ((process_MBs == NULL) || (tmp_node == NULL) || (saved_magnitude_for_node == NULL)) { + if ((process_MBs == NULL) || (tmp_node == NULL)) { numad_log(LOG_CRIT, "process_MBs realloc failed\n"); exit(EXIT_FAILURE); } } + // For existing processes, get miscellaneous process specific details int pid_ix; process_data_p p = NULL; if ((pid > 0) && ((pid_ix = process_hash_lookup(pid)) >= 0)) { p = &process_hash_table[pid_ix]; - // Quick rejection if this process has interleaved memory, but recheck it once an hour... -#define MIN_DELAY_FOR_INTERLEAVE (3600 * ONE_HUNDRED) - if (((p->flags & PROCESS_FLAG_INTERLEAVED) > 0) - && (p->bind_time_stamp + MIN_DELAY_FOR_INTERLEAVE > time_stamp)) { - if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "Skipping evaluation because of interleaved memory.\n"); - } - return NULL; - } - // Get cpuset name for this process, and existing mems binding, if any. - char fname[FNAME_SIZE]; - snprintf(fname, FNAME_SIZE, "/proc/%d/cpuset", pid); - FILE *fs = fopen(fname, "r"); - if (!fs) { - numad_log(LOG_WARNING, "Tried to research PID %d cpuset, but it apparently went away.\n", p->pid); - return NULL; // Assume the process terminated? - } - if (!fgets(buf, BUF_SIZE, fs)) { - numad_log(LOG_WARNING, "Tried to research PID %d cpuset, but it apparently went away.\n", p->pid); - fclose(fs); - return NULL; // Assume the process terminated? - } - fclose(fs); - ELIM_NEW_LINE(buf); - if ((!p->cpuset_name) || (strcmp(p->cpuset_name, buf))) { - if (p->cpuset_name != NULL) { - free(p->cpuset_name); - } - p->cpuset_name = strdup(buf); - } - if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "CPUSET_NAME: %s\n", p->cpuset_name); - } - snprintf(fname, FNAME_SIZE, "%s%s/cpuset.mems", cpuset_dir, p->cpuset_name); - fs = fopen(fname, "r"); - if ((fs) && (fgets(buf, BUF_SIZE, fs))) { - fclose(fs); - num_existing_mems = add_ids_to_list_from_str(existing_mems_list_p, buf); - if (log_level >= LOG_DEBUG) { - str_from_id_list(buf, BUF_SIZE, existing_mems_list_p); - numad_log(LOG_DEBUG, "EXISTING CPUSET NODE LIST: %s\n", buf); - } - } - // If this process was just recently bound, enforce a minimum delay - // period between repeated attempts to potentially move the memory. - // FIXME: ?? might this retard appropriate process expansion too much? -#define MIN_DELAY_FOR_REEVALUATION (30 * ONE_HUNDRED) - if (p->bind_time_stamp + MIN_DELAY_FOR_REEVALUATION > time_stamp) { - // Skip re-evaluation because we just did it recently. - if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "Skipping evaluation because done too recently.\n"); - } - return NULL; - } - // Look for short cut because of duplicate bindings. If we have bound - // this process to the same nodes multiple times already, and the load - // on those nodes still seems acceptable, skip the rest of this and - // just return NULL to indicate no change needed. FIXME: should figure - // out what can change that would make a rebinding desirable (e.g. (1) - // some process gets sub-optimal allocation on busy machine which - // subsequently becomes less busy leaving disadvantaged process. (2) - // node load imbalance, (3) any process split across nodes which should - // fit within a single node.) For now, just expire the dup_bid_count - // occasionally, which is a reasonably good mitigation. - // So, check to see if we should decay the dup_bind_count... -#define DUP_BIND_TIME_OUT (300 * ONE_HUNDRED) - if ((p->dup_bind_count > 0) && (p->bind_time_stamp + DUP_BIND_TIME_OUT < time_stamp)) { - p->dup_bind_count -= 1; - } - // Now, look for short cut because of duplicate bindings - if (p->dup_bind_count > 0) { - int node_id = 0; - int nodes_have_cpu = 1; - int nodes_have_ram = 1; - int n = num_existing_mems; - int min_resource_pct = 100 - target_utilization; - if (min_resource_pct < 5) { - min_resource_pct = 5; - } - while (n) { - if (ID_IS_IN_LIST(node_id, existing_mems_list_p)) { - nodes_have_cpu &= ((100 * node[node_id].CPUs_free / node[node_id].CPUs_total) >= (min_resource_pct)); - nodes_have_ram &= ((100 * node[node_id].MBs_free / node[node_id].MBs_total) >= (min_resource_pct)); - n -= 1; - } - node_id += 1; - } - if ((nodes_have_cpu) && (nodes_have_ram)) { - if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "Skipping evaluation because of repeat binding\n"); - } - return NULL; - } - if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "Evaluated for skipping by repeat binding, but CPUS: %d, RAM: %d\n", nodes_have_cpu, nodes_have_ram); - } - } - // Fourth, add up per-node memory in use by this process. This scanning - // is expensive and should be minimized. Also, old kernels dismantle - // transparent huge pages while producing the numa_maps memory - // information! + // Correct current CPUs amount for utilization factor inflation + process_CPUs = (cpus * target_utilization) / 100; + // Add up per-node memory in use by this process. + // This scanning is expensive and should be minimized. memset(process_MBs, 0, process_MBs_num_nodes * sizeof(uint64_t)); + char fname[FNAME_SIZE]; snprintf(fname, FNAME_SIZE, "/proc/%d/numa_maps", pid); - fs = fopen(fname, "r"); + FILE *fs = fopen(fname, "r"); if (!fs) { numad_log(LOG_WARNING, "Tried to research PID %d numamaps, but it apparently went away.\n", p->pid); return NULL; // Assume the process terminated @@ -1681,7 +1961,25 @@ id_list_p pick_numa_nodes(int pid, int cpus, int mbs, int assume_enough_cpus) { fclose(fs); for (int ix = 0; (ix <= num_nodes); ix++) { process_MBs[ix] /= MEGABYTE; - if (log_level >= LOG_DEBUG) { + if (p->bind_time_stamp) { + if ((process_MBs[ix]) && (!ID_IS_IN_LIST(ix, p->node_list_p))) { + // FIXME: If process previously bound, but memory appears + // to exist where it should not, this might identify + // processes for which the kernel does not move all the + // memory for whatever reason.... Must check for + // significant amount before doing anything about it, + // however, since memory for libraries, etc, can get moved + // around. + } + } else { + // If process has not yet been bound, set node list to existing nodes with memory + if (process_MBs[ix]) { + ADD_ID_TO_LIST(ix, p->node_list_p); + } else { + CLR_ID_IN_LIST(ix, p->node_list_p); + } + } + if ((log_level >= LOG_DEBUG) && (process_MBs[ix] > 0)) { if (ix == num_nodes) { numad_log(LOG_DEBUG, "Interleaved MBs: %ld\n", ix, process_MBs[ix]); } else { @@ -1691,89 +1989,75 @@ id_list_p pick_numa_nodes(int pid, int cpus, int mbs, int assume_enough_cpus) { } if ((process_has_interleaved_memory) && (keep_interleaved_memory)) { // Mark this process as having interleaved memory so we do not - // merge the interleaved memory. Time stamp it as done. + // merge the interleaved memory. Time stamp it as done and return. p->flags |= PROCESS_FLAG_INTERLEAVED; p->bind_time_stamp = get_time_stamp(); if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "Skipping evaluation because of interleaved memory.\n"); + numad_log(LOG_DEBUG, "Skipping evaluation of PID %d because of interleaved memory.\n", p->pid); } return NULL; } } // end of existing PID conditional + // Make a copy of node available resources array. Add in info specific to // this process to equalize available resource quantities wrt locations of - // resources already in use by this process. Inflate the value of already - // assigned memory by approximately 3/2, because moving memory is - // expensive. Average the amount of CPUs_free across the existing nodes - // used, because the threads are free to move around in that domain. After - // calculating combined magnitude of available resources, bias the values - // towards existing locations for this process. - int target_using_all_nodes = 0; - uint64_t node_CPUs_free_for_this_process = 0; + // resources already in use by this process. After calculating weighted + // magnitude of available resources, bias the values towards existing + // locations for this process. memcpy(tmp_node, node, num_nodes * sizeof(node_data_t) ); - if (num_existing_mems > 0) { - node_CPUs_free_for_this_process = cpus; // ?? Correct for utilization target inflation? - int node_id = 0; - int n = num_existing_mems; - while (n) { - if (ID_IS_IN_LIST(node_id, existing_mems_list_p)) { - node_CPUs_free_for_this_process += tmp_node[node_id].CPUs_free; - n -= 1; - } - node_id += 1; - } - // Divide to get average CPUs_free for the nodes in use by process - node_CPUs_free_for_this_process /= num_existing_mems; - } for (int ix = 0; (ix < num_nodes); ix++) { - if (pid > 0) { - tmp_node[ix].MBs_free += ((process_MBs[ix] * 12) / 8); - } + // Add back (biased) memory already used by this process on this node + tmp_node[ix].MBs_free += ((process_MBs[ix] * 8) / 8); // FIXME: apply bias here? if (tmp_node[ix].MBs_free > tmp_node[ix].MBs_total) { tmp_node[ix].MBs_free = tmp_node[ix].MBs_total; } - if ((num_existing_mems > 0) && (ID_IS_IN_LIST(ix, existing_mems_list_p))) { - tmp_node[ix].CPUs_free = node_CPUs_free_for_this_process; + // Add back CPU in proportion to amount of memory already used on this + // node Making assumption here that CPU execution threads are actually + // running on the same nodes where memory is assigned... FIXME: should + // we perhaps do this only if process already explicitly bound? + uint64_t prorated_CPU = (process_CPUs * process_MBs[ix]) / mbs; + if ((log_level >= LOG_DEBUG) && (prorated_CPU > 0)) { + numad_log(LOG_DEBUG, "PROCESS_CPUs[%d]: %ld\n", ix, prorated_CPU); } + tmp_node[ix].CPUs_free += prorated_CPU; if (tmp_node[ix].CPUs_free > tmp_node[ix].CPUs_total) { tmp_node[ix].CPUs_free = tmp_node[ix].CPUs_total; } - if (tmp_node[ix].CPUs_free < 1) { // force 1/100th CPU minimum + if (tmp_node[ix].CPUs_free < 1) { + // enforce 1/100th CPU minimum tmp_node[ix].CPUs_free = 1; } - if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "PROCESS_CPUs[%d]: %ld\n", ix, tmp_node[ix].CPUs_free); - } - // Calculate magnitude as product of available CPUs and available MBs - tmp_node[ix].magnitude = tmp_node[ix].CPUs_free * tmp_node[ix].MBs_free; + // numad_log(LOG_DEBUG, "Raw Node[%d]: mem: %ld cpu: %ld\n", ix, tmp_node[ix].MBs_free, tmp_node[ix].CPUs_free); + tmp_node[ix].magnitude = combined_value_of_weighted_resources(ix, mbs, cpus, tmp_node[ix].MBs_free, tmp_node[ix].CPUs_free); // Bias combined magnitude towards already assigned nodes - if (ID_IS_IN_LIST(ix, existing_mems_list_p)) { - tmp_node[ix].magnitude *= 9; - tmp_node[ix].magnitude /= 8; + if ((pid > 0) && (ID_IS_IN_LIST(ix, p->node_list_p))) { + tmp_node[ix].magnitude *= 17; + tmp_node[ix].magnitude /= 16; } - // Save the current magnitudes - saved_magnitude_for_node[ix] = tmp_node[ix].magnitude; } - // OK, figure out where to get resources for this request. - static id_list_p target_node_list_p; - CLEAR_LIST(target_node_list_p); + + // Figure out where to get resources for this request. int prev_node_used = -1; + static id_list_p target_node_list_p; + CLEAR_NODE_LIST(target_node_list_p); + // Establish a CPU flex fudge factor, on the presumption it is OK if not + // quite all the CPU request is met. However, if trying to find resources + // for pre-placement advice request, do not underestimate the amount of + // CPUs needed. Instead, err on the side of providing too many resources. + int cpu_flex = 0; + if ((pid > 0) && (target_utilization < 100)) { + // FIXME: Is half of the utilization margin a good amount of CPU flexing? + cpu_flex = ((100 - target_utilization) * tmp_node[0].CPUs_total) / 200; + } + // Figure out minimum number of nodes required + int mem_req_nodes = ceil((double)mbs / (double)node[0].MBs_total); + int cpu_req_nodes = ceil((double)(cpus - cpu_flex) / (double)node[0].CPUs_total); + int min_req_nodes = mem_req_nodes; + if (min_req_nodes < cpu_req_nodes) { + min_req_nodes = cpu_req_nodes; + } // Continue to allocate more resources until request are met. - // OK if not not quite all the CPU request is met. - // FIXME: ?? Is half of the utilization margin a good amount of CPU flexing? - int cpu_flex = ((100 - target_utilization) * tmp_node[0].CPUs_total) / 200; - if (pid <= 0) { - // If trying to find resources for pre-placement advice request, do not - // underestimate the amount of CPUs needed. Instead, err on the side - // of providing too many resources. So, no flexing here... - cpu_flex = 0; - } else if (assume_enough_cpus) { - // If CPU requests "should" fit, then just make - // cpu_flex big enough to meet the cpu request. - // This causes memory to be the only consideration. - cpu_flex = cpus + 1; - } - while ((mbs > 0) || (cpus > cpu_flex)) { + while ((min_req_nodes > 0) || (mbs > 0) || ((cpus > cpu_flex) && (!assume_enough_cpus))) { if (log_level >= LOG_DEBUG) { numad_log(LOG_DEBUG, "MBs: %d, CPUs: %d\n", mbs, cpus); } @@ -1811,22 +2095,18 @@ id_list_p pick_numa_nodes(int pid, int cpus, int mbs, int assume_enough_cpus) { // last one we used. This is not going to make progress... So // just punt and use everything. OR_LISTS(target_node_list_p, target_node_list_p, all_nodes_list_p); - target_using_all_nodes = 1; break; } prev_node_used = tmp_node[0].node_id; ADD_ID_TO_LIST(tmp_node[0].node_id, target_node_list_p); - if (log_level >= LOG_DEBUG) { - str_from_id_list(buf, BUF_SIZE, existing_mems_list_p); - str_from_id_list(buf2, BUF_SIZE, target_node_list_p); - numad_log(LOG_DEBUG, "Existing nodes: %s Target nodes: %s\n", buf, buf2); - } + min_req_nodes -= 1; if (EQUAL_LISTS(target_node_list_p, all_nodes_list_p)) { // Apparently we must use all resource nodes... - target_using_all_nodes = 1; break; } -#define MBS_MARGIN 10 + // "Consume" the resources on this node +#define CPUS_MARGIN 0 +#define MBS_MARGIN 100 if (tmp_node[0].MBs_free >= (mbs + MBS_MARGIN)) { tmp_node[0].MBs_free -= mbs; mbs = 0; @@ -1834,7 +2114,6 @@ id_list_p pick_numa_nodes(int pid, int cpus, int mbs, int assume_enough_cpus) { mbs -= (tmp_node[0].MBs_free - MBS_MARGIN); tmp_node[0].MBs_free = MBS_MARGIN; } -#define CPUS_MARGIN 0 if (tmp_node[0].CPUs_free >= (cpus + CPUS_MARGIN)) { tmp_node[0].CPUs_free -= cpus; cpus = 0; @@ -1842,126 +2121,52 @@ id_list_p pick_numa_nodes(int pid, int cpus, int mbs, int assume_enough_cpus) { cpus -= (tmp_node[0].CPUs_free - CPUS_MARGIN); tmp_node[0].CPUs_free = CPUS_MARGIN; } - tmp_node[0].magnitude = tmp_node[0].CPUs_free * tmp_node[0].MBs_free; - } - // If this existing process is already located where we want it, and almost - // all memory is already moved to those nodes, then return NULL indicating - // no need to change binding this time. - if ((pid > 0) && (EQUAL_LISTS(target_node_list_p, existing_mems_list_p))) { - // May not need to change binding. However, if there is any significant - // memory still on non-target nodes, advise the bind anyway because - // there are some scenarios when the kernel will not move it all the - // first time. - if (!target_using_all_nodes) { - p->dup_bind_count += 1; - for (int ix = 0; (ix < num_nodes); ix++) { - if ((process_MBs[ix] > 10) && (!ID_IS_IN_LIST(ix, target_node_list_p))) { - goto try_memory_move_again; - } - } - // We will accept these memory locations. Stamp it as done. - p->bind_time_stamp = get_time_stamp(); - } - // Skip rebinding either because practically all memory is in the - // target nodes, or because we are stuck using all the nodes. + tmp_node[0].magnitude = combined_value_of_weighted_resources(0, mbs, cpus, tmp_node[0].MBs_free, tmp_node[0].CPUs_free); + } + + // If this existing process is already located where we want it, then just + // return NULL indicating no need to change binding this time. + if ((pid > 0) && (p->bind_time_stamp) && (EQUAL_LISTS(target_node_list_p, p->node_list_p))) { if (log_level >= LOG_DEBUG) { - numad_log(LOG_DEBUG, "Skipping evaluation because memory is reasonably situated.\n"); + numad_log(LOG_DEBUG, "Process %d already bound to target nodes.\n", p->pid); } + p->bind_time_stamp = get_time_stamp(); return NULL; - } else { - // Either a non-existing process, or a new binding for an existing process. - if (p != NULL) { - // Must be a new binding for an existing process, so reset dup_bind_count. - p->dup_bind_count = 0; - } - } - // See if this proposed move will make a significant difference. - // If not, return null instead of advising the move. - uint64_t target_magnitude = 0; - uint64_t existing_magnitude = 0; - int num_target_nodes = NUM_IDS_IN_LIST(target_node_list_p); - int num_existing_nodes = NUM_IDS_IN_LIST(existing_mems_list_p); - /* FIXME: this expansion seems to cause excessive growth - * So calculate the improvement before hastily expanding nodes. - if (num_target_nodes > num_existing_nodes) { goto try_memory_move_again; } - */ - int node_id = 0; - int n = num_existing_nodes + num_target_nodes; - while (n) { - if (ID_IS_IN_LIST(node_id, target_node_list_p)) { - target_magnitude += saved_magnitude_for_node[node_id]; - n -= 1; - } - if (ID_IS_IN_LIST(node_id, existing_mems_list_p)) { - existing_magnitude += saved_magnitude_for_node[node_id]; - n -= 1; - } - node_id += 1; - } - if (existing_magnitude > 0) { - uint64_t magnitude_change = ((target_magnitude - existing_magnitude) * 100) / existing_magnitude; - if (magnitude_change < 0) { - magnitude_change = -(magnitude_change); - } - if (magnitude_change <= IMPROVEMENT_THRESHOLD_PERCENT) { - // Not significant enough percentage change to do rebind - if (log_level >= LOG_DEBUG) { - str_from_id_list(buf, BUF_SIZE, existing_mems_list_p); - str_from_id_list(buf2, BUF_SIZE, target_node_list_p); - numad_log(LOG_DEBUG, "Moving pid %d from nodes (%s) to nodes (%s) skipped as insignificant improvement: %ld percent.\n", - pid, buf, buf2, magnitude_change); - } - // We decided this is almost good enough. Stamp it as done. - p->bind_time_stamp = get_time_stamp(); - return NULL; - } } - if ((pid <= 0) && (num_target_nodes <= 0)) { - // Always provide at least one node for pre-placement advice + // Must always provide at least one node for pre-placement advice + // FIXME: verify this can happen only if no resources requested... + if ((pid <= 0) && (NUM_IDS_IN_LIST(target_node_list_p) <= 0)) { ADD_ID_TO_LIST(node[0].node_id, target_node_list_p); } -try_memory_move_again: - str_from_id_list(buf, BUF_SIZE, existing_mems_list_p); + // Log advice, and return target node list + if ((pid > 0) && (p->bind_time_stamp)) { + str_from_id_list(buf, BUF_SIZE, p->node_list_p); + } else { + str_from_id_list(buf, BUF_SIZE, all_nodes_list_p); + } + char buf2[BUF_SIZE]; str_from_id_list(buf2, BUF_SIZE, target_node_list_p); char *cmd_name = "(unknown)"; if ((p) && (p->comm)) { cmd_name = p->comm; } numad_log(LOG_NOTICE, "Advising pid %d %s move from nodes (%s) to nodes (%s)\n", pid, cmd_name, buf, buf2); + + if (pid > 0) { + // FIXME: Consider moving this out to caller?? + COPY_LIST(target_node_list_p, p->node_list_p); + } return target_node_list_p; } -void show_processes(process_data_p *ptr, int nprocs) { - time_t ts = time(NULL); - fprintf(log_fs, "%s", ctime(&ts)); - fprintf(log_fs, "Candidates: %d\n", nprocs); - for (int ix = 0; (ix < nprocs); ix++) { - process_data_p p = ptr[ix]; - char buf[BUF_SIZE]; - snprintf(buf, BUF_SIZE, "%s%s/cpuset.mems", cpuset_dir, p->cpuset_name); - FILE *fs = fopen(buf, "r"); - buf[0] = '\0'; - if (fs) { - if (fgets(buf, BUF_SIZE, fs)) { - ELIM_NEW_LINE(buf); - } - fclose(fs); - } - fprintf(log_fs, "%ld: PID %d: %s, Threads %2ld, MBs_used %6ld, CPUs_used %4ld, Magnitude %6ld, Nodes: %s\n", - p->data_time_stamp, p->pid, p->comm, p->num_threads, p->MBs_used, p->CPUs_used, p->MBs_used * p->CPUs_used, buf); - } - fprintf(log_fs, "\n"); - fflush(log_fs); -} - - int manage_loads() { + uint64_t time_stamp = get_time_stamp(); // Use temporary index to access and sort hash table entries - static process_data_p *pindex; static int pindex_size; + static process_data_p *pindex; if (pindex_size < process_hash_table_size) { pindex_size = process_hash_table_size; pindex = realloc(pindex, pindex_size * sizeof(process_data_p)); @@ -1974,34 +2179,69 @@ int manage_loads() { return min_interval / 2; } memset(pindex, 0, pindex_size * sizeof(process_data_p)); - // Copy live candidate pointers to the index for sorting, etc + // Copy live candidate pointers to the index for sorting + // if they meet the threshold for memory usage and CPU usage. int nprocs = 0; + long sum_CPUs_used = 0; for (int ix = 0; (ix < process_hash_table_size); ix++) { process_data_p p = &process_hash_table[ix]; - if (p->pid) { + if ((p->pid) && (p->CPUs_used * p->MBs_used > CPU_THRESHOLD * MEMORY_THRESHOLD)) { pindex[nprocs++] = p; + sum_CPUs_used += p->CPUs_used; + // Initialize node list, if not already done for this process. + if (p->node_list_p == NULL) { + initialize_mem_node_list(p); + } } } - // Sort index by amount of CPU used * amount of memory used. Not expecting - // a long list here. Use a simple sort -- however, sort into bins, - // treating values within 10% as aquivalent. Within bins, order by - // bind_time_stamp so oldest bound will be higher priority to evaluate. + // Order candidate considerations using timestamps and magnitude: amount of + // CPU used * amount of memory used. Not expecting a long list here. Use + // a simplistic sort -- however move all not yet bound to front of list and + // order by decreasing magnitude. Previously bound processes follow in + // bins of increasing magnitude treating values within 20% as aquivalent. + // Within bins, order by bind_time_stamp so oldest bound will be higher + // priority to evaluate. Start by moving all unbound to beginning. + int num_unbound = 0; for (int ij = 0; (ij < nprocs); ij++) { + if (pindex[ij]->bind_time_stamp == 0) { + process_data_p tmp = pindex[num_unbound]; + pindex[num_unbound++] = pindex[ij]; + pindex[ij] = tmp; + } + } + // Sort all unbound so biggest magnitude comes first + for (int ij = 0; (ij < num_unbound); ij++) { + int best = ij; + for (int ik = ij + 1; (ik < num_unbound); ik++) { + uint64_t ik_mag = (pindex[ ik]->CPUs_used * pindex[ ik]->MBs_size); + uint64_t best_mag = (pindex[best]->CPUs_used * pindex[best]->MBs_size); + if (ik_mag <= best_mag) continue; + best = ik; + } + if (best != ij) { + process_data_p tmp = pindex[ij]; + pindex[ij] = pindex[best]; + pindex[best] = tmp; + } + } + // Sort the remaining candidates into bins of increasting magnitude, and by + // timestamp within bins. + for (int ij = num_unbound; (ij < nprocs); ij++) { int best = ij; for (int ik = ij + 1; (ik < nprocs); ik++) { - uint64_t ik_mag = (pindex[ ik]->CPUs_used * pindex[ ik]->MBs_used); - uint64_t best_mag = (pindex[best]->CPUs_used * pindex[best]->MBs_used); + uint64_t ik_mag = (pindex[ ik]->CPUs_used * pindex[ ik]->MBs_size); + uint64_t best_mag = (pindex[best]->CPUs_used * pindex[best]->MBs_size); uint64_t min_mag = ik_mag; uint64_t diff_mag = best_mag - ik_mag; if (diff_mag < 0) { diff_mag = -(diff_mag); min_mag = best_mag; } - if ((diff_mag > 0) && (min_mag / diff_mag < 10)) { - // difference > 10 percent. Use strict ordering - if (ik_mag <= best_mag) continue; + if ((diff_mag > 0) && (min_mag / diff_mag < 5)) { + // difference > 20 percent. Use magnitude ordering + if (ik_mag >= best_mag) continue; } else { - // difference within 10 percent. Sort these by bind_time_stamp. + // difference within 20 percent. Sort these by bind_time_stamp. if (pindex[ik]->bind_time_stamp > pindex[best]->bind_time_stamp) continue; } best = ik; @@ -2012,23 +2252,69 @@ int manage_loads() { pindex[best] = tmp; } } + // Show the candidate processes in the log file if ((log_level >= LOG_INFO) && (nprocs > 0)) { - show_processes(pindex, nprocs); + numad_log(LOG_INFO, "Candidates: %d\n", nprocs); + for (int ix = 0; (ix < nprocs); ix++) { + process_data_p p = pindex[ix]; + char buf[BUF_SIZE]; + str_from_id_list(buf, BUF_SIZE, p->node_list_p); + fprintf(log_fs, "%ld: PID %d: %s, Threads %2ld, MBs_size %6ld, MBs_used %6ld, CPUs_used %4ld, Magnitude %6ld, Nodes: %s\n", + p->data_time_stamp, p->pid, p->comm, p->num_threads, p->MBs_size, p->MBs_used, p->CPUs_used, p->MBs_used * p->CPUs_used, buf); + } + fflush(log_fs); } - // Estimate desired size and make resource requests for each significant process + // Estimate desired size (+ margin capacity) and + // make resource requests for each candidate process for (int ix = 0; (ix < nprocs); ix++) { process_data_p p = pindex[ix]; - if (p->CPUs_used * p->MBs_used < CPU_THRESHOLD * MEMORY_THRESHOLD) { - break; // No more significant processes worth worrying about... - } - int mb_request = (p->MBs_used * 100) / target_utilization; - int cpu_request = (p->CPUs_used * 100) / target_utilization; - // Do not give a process more CPUs than it has threads! - // FIXME: For guest VMs, should limit max to VCPU threads. Will - // need to do something more intelligent with guest IO threads - // when eventually considering devices and IRQs. + // If this process was recently bound, enforce a three-minute minimum + // delay between repeated attempts to potentially move the process. + // FIXME: make this delay contingent on node resource equity? Or, + // maybe change in running averages? Perhaps detect change in averages, + // or look at stddev? What is a good range for the delay? Discrete or + // continuous? +#define MIN_DELAY_FOR_REEVALUATION (180 * ONE_HUNDRED) + if (p->bind_time_stamp + MIN_DELAY_FOR_REEVALUATION > time_stamp) { + // Skip re-evaluation because we just did it recently. + if (log_level >= LOG_DEBUG) { + numad_log(LOG_DEBUG, "Skipping evaluation of PID %d because done too recently.\n", p->pid); + } + continue; + } + // If this process has interleaved memory, recheck it only every 30 minutes... +#define MIN_DELAY_FOR_INTERLEAVE (1800 * ONE_HUNDRED) + if (((p->flags & PROCESS_FLAG_INTERLEAVED) > 0) + && (p->bind_time_stamp + MIN_DELAY_FOR_INTERLEAVE > time_stamp)) { + if (log_level >= LOG_DEBUG) { + numad_log(LOG_DEBUG, "Skipping evaluation of PID %d because of interleaved memory.\n", p->pid); + } + continue; + } + // Expand resources needed estimate using target_utilization factor. + // Start with the CPUs actually used (capped by number of threads) for + // CPUs required, but use the process virtual memory size for MBs + // requirement, (We previously used the RSS for MBs needed, but that + // caused problems with processes that had quickly expanding memory + // usage which also needed to cross NUMA boundaries. The downside of + // this choice is we might not pack processes as tightly as possible + // anymore. Hopefully this will be a relatively rare occurence in + // practice. KVM guests should not be significantly over-provisioned + // with memory they will never use!) + int mem_target_utilization = target_utilization; + int cpu_target_utilization = target_utilization; + // Cap memory utilization at 100 percent (but allow CPUs to oversubscribe) + if (mem_target_utilization > 100) { + mem_target_utilization = 100; + } + int mb_request = (p->MBs_size * 100) / mem_target_utilization; + int cpu_request = (p->CPUs_used * 100) / cpu_target_utilization; + // But do not give a process more CPUs than it has threads! int thread_limit = p->num_threads; - // If process looks like a KVM guest, try to limit to number of vCPU threads + // If process looks like a KVM guest, try to limit thread count to the + // number of vCPU threads. FIXME: Will need to do something more + // intelligent than this with guest IO threads when eventually + // considering devices and IRQs. if ((p->comm) && (p->comm[0] == '(') && (p->comm[1] == 'q') && (strcmp(p->comm, "(qemu-kvm)") == 0)) { int kvm_vcpu_threads = get_num_kvm_vcpu_threads(p->pid); if (thread_limit > kvm_vcpu_threads) { @@ -2039,24 +2325,18 @@ int manage_loads() { if (cpu_request > thread_limit) { cpu_request = thread_limit; } - long average_total_cpus = 0; - for (int ix = 0; (ix < num_nodes); ix++) { - average_total_cpus += node[ix].CPUs_total; - } - average_total_cpus /= num_nodes; - int assume_enough_cpus = (cpu_request <= average_total_cpus); + // OK, now pick NUMA nodes for this process and bind it! pthread_mutex_lock(&node_info_mutex); + int assume_enough_cpus = (sum_CPUs_used <= sum_CPUs_total); id_list_p node_list_p = pick_numa_nodes(p->pid, cpu_request, mb_request, assume_enough_cpus); - // FIXME: ?? copy node_list_p to shorten mutex region? - if ((node_list_p != NULL) && (bind_process_and_migrate_memory(p->pid, p->cpuset_name, node_list_p, NULL))) { - // Shorten interval if actively moving processes + if ((node_list_p != NULL) && (bind_process_and_migrate_memory(p))) { pthread_mutex_unlock(&node_info_mutex); - p->bind_time_stamp = get_time_stamp(); + // Return minimum interval when actively moving processes return min_interval; } pthread_mutex_unlock(&node_info_mutex); } - // Return maximum interval if no process movement + // Return maximum interval when no process movement return max_interval; } @@ -2070,6 +2350,14 @@ void *set_dynamic_options(void *arg) { msg_t msg; recv_msg(&msg); switch (msg.body.cmd) { + case 'C': + use_inactive_file_cache = (msg.body.arg1 != 0); + if (use_inactive_file_cache) { + numad_log(LOG_NOTICE, "Counting inactive file cache as available\n"); + } else { + numad_log(LOG_NOTICE, "Counting inactive file cache as unavailable\n"); + } + break; case 'H': thp_scan_sleep_ms = msg.body.arg1; set_thp_scan_sleep_ms(thp_scan_sleep_ms); @@ -2116,6 +2404,11 @@ void *set_dynamic_options(void *arg) { numad_log(LOG_NOTICE, "Scanning only explicit PID list processes\n"); } break; + case 't': + numad_log(LOG_NOTICE, "Changing logical CPU thread percent to %d\n", msg.body.arg1); + htt_percent = msg.body.arg1; + node_info_time_stamp = 0; // to force rescan of nodes/cpus soon + break; case 'u': numad_log(LOG_NOTICE, "Changing target utilization to %d\n", msg.body.arg1); target_utilization = msg.body.arg1; @@ -2195,6 +2488,7 @@ void parse_two_arg_values(char *p, int *first_ptr, int *second_ptr, int first_is int main(int argc, char *argv[]) { int opt; + int C_flag = 0; int d_flag = 0; int H_flag = 0; int i_flag = 0; @@ -2203,14 +2497,19 @@ int main(int argc, char *argv[]) { int p_flag = 0; int r_flag = 0; int S_flag = 0; + int t_flag = 0; int u_flag = 0; int v_flag = 0; int w_flag = 0; int x_flag = 0; - int tmp_ms = 0; + int tmp_int = 0; long list_pid = 0; - while ((opt = getopt(argc, argv, "dD:hH:i:K:l:p:r:R:S:u:vVw:x:")) != -1) { + while ((opt = getopt(argc, argv, "C:dD:hH:i:K:l:p:r:R:S:t:u:vVw:x:")) != -1) { switch (opt) { + case 'C': + C_flag = 1; + use_inactive_file_cache = (atoi(optarg) != 0); + break; case 'd': d_flag = 1; log_level = LOG_DEBUG; @@ -2222,10 +2521,11 @@ int main(int argc, char *argv[]) { print_usage_and_exit(argv[0]); break; case 'H': - tmp_ms = atoi(optarg); - if ((tmp_ms > 9) && (tmp_ms < 1000001)) { + tmp_int = atoi(optarg); + if ((tmp_int == 0) || ((tmp_int > 9) && (tmp_int < 1000001))) { + // 0 means do not change the system default value H_flag = 1; - thp_scan_sleep_ms = tmp_ms; + thp_scan_sleep_ms = tmp_int; } else { fprintf(stderr, "THP scan_sleep_ms must be > 9 and < 1000001\n"); exit(EXIT_FAILURE); @@ -2263,9 +2563,19 @@ int main(int argc, char *argv[]) { S_flag = 1; scan_all_processes = (atoi(optarg) != 0); break; + case 't': + tmp_int = atoi(optarg); + if ((tmp_int >= 0) && (tmp_int <= 100)) { + t_flag = 1; + htt_percent = tmp_int; + } + break; case 'u': - u_flag = 1; - target_utilization = atoi(optarg); + tmp_int = atoi(optarg); + if ((tmp_int >= 10) && (tmp_int <= 130)) { + u_flag = 1; + target_utilization = tmp_int; + } break; case 'v': v_flag = 1; @@ -2302,15 +2612,6 @@ int main(int argc, char *argv[]) { open_log_file(); init_msg_queue(); num_cpus = get_num_cpus(); - if (reserved_cpu_str != NULL) { - char buf[BUF_SIZE]; - CLEAR_LIST(reserved_cpu_mask_list_p); - int n = add_ids_to_list_from_str(reserved_cpu_mask_list_p, reserved_cpu_str); - str_from_id_list(buf, BUF_SIZE, reserved_cpu_mask_list_p); - numad_log(LOG_NOTICE, "Reserving %d CPUs (%s) for non-numad use\n", n, buf); - // turn reserved list into a negated mask for later ANDing use... - negate_list(reserved_cpu_mask_list_p); - } page_size_in_bytes = sysconf(_SC_PAGESIZE); huge_page_size_in_bytes = get_huge_page_size_in_bytes(); // Figure out if this is the daemon, or a subsequent invocation @@ -2319,6 +2620,9 @@ int main(int argc, char *argv[]) { // Daemon is already running. So send dynamic options to persistant // thread to handle requests, get the response (if any), and finish. msg_t msg; + if (C_flag) { + send_msg(daemon_pid, 'C', use_inactive_file_cache, 0, ""); + } if (H_flag) { send_msg(daemon_pid, 'H', thp_scan_sleep_ms, 0, ""); } @@ -2340,6 +2644,9 @@ int main(int argc, char *argv[]) { if (S_flag) { send_msg(daemon_pid, 'S', scan_all_processes, 0, ""); } + if (t_flag) { + send_msg(daemon_pid, 't', htt_percent, 0, ""); + } if (u_flag) { send_msg(daemon_pid, 'u', target_utilization, 0, ""); } @@ -2351,14 +2658,30 @@ int main(int argc, char *argv[]) { if (x_flag) { send_msg(daemon_pid, 'x', list_pid, 0, ""); } - } else if (w_flag) { - // Get pre-placement NUMA advice without starting daemon + close_log_file(); + exit(EXIT_SUCCESS); + } + // No numad daemon running yet. + // First, make note of any reserved CPUs.... + if (reserved_cpu_str != NULL) { + CLEAR_CPU_LIST(reserved_cpu_mask_list_p); + int n = add_ids_to_list_from_str(reserved_cpu_mask_list_p, reserved_cpu_str); char buf[BUF_SIZE]; + str_from_id_list(buf, BUF_SIZE, reserved_cpu_mask_list_p); + numad_log(LOG_NOTICE, "Reserving %d CPUs (%s) for non-numad use\n", n, buf); + // turn reserved list into a negated mask for later ANDing use... + negate_list(reserved_cpu_mask_list_p); + } + // If it is a "-w" pre-placement request, handle that without starting + // the daemon. Otherwise start the numad daemon. + if (w_flag) { + // Get pre-placement NUMA advice without starting daemon update_nodes(); sleep(2); update_nodes(); numad_log(LOG_NOTICE, "Getting NUMA pre-placement advice for %d CPUs and %d MBs\n", requested_cpus, requested_mbs); id_list_p node_list_p = pick_numa_nodes(-1, requested_cpus, requested_mbs, 0); + char buf[BUF_SIZE]; str_from_id_list(buf, BUF_SIZE, node_list_p); fprintf(stdout, "%s\n", buf); close_log_file(); @@ -2366,6 +2689,7 @@ int main(int argc, char *argv[]) { } else if (max_interval > 0) { // Start the numad daemon... check_prereqs(argv[0]); +#if (!NO_DAEMON) // Daemonize self... daemon_pid = fork(); if (daemon_pid < 0) { numad_log(LOG_CRIT, "fork() failed\n"); exit(EXIT_FAILURE); } @@ -2386,9 +2710,21 @@ int main(int argc, char *argv[]) { if (log_fs != stderr) { fclose(stderr); } +#endif + // Set up signal handlers + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = sig_handler; + if (sigaction(SIGHUP, &sa, NULL) + || sigaction(SIGTERM, &sa, NULL) + || sigaction(SIGQUIT, &sa, NULL)) { + numad_log(LOG_CRIT, "sigaction does not work?\n"); + exit(EXIT_FAILURE); + } // Allocate initial process hash table process_hash_table_expand(); - // Spawn thread to handle messages from subsequent invocation requests + // Spawn a thread to handle messages from subsequent invocation requests + // and also a lazy background thread to clean up obsolete cpusets. pthread_mutex_init(&pid_list_mutex, NULL); pthread_mutex_init(&node_info_mutex, NULL); pthread_attr_t attr; @@ -2398,7 +2734,11 @@ int main(int argc, char *argv[]) { } pthread_t tid; if (pthread_create(&tid, &attr, &set_dynamic_options, &tid) != 0) { - numad_log(LOG_CRIT, "pthread_create failure\n"); + numad_log(LOG_CRIT, "pthread_create failure: setting thread\n"); + exit(EXIT_FAILURE); + } + if (pthread_create(&tid, &attr, &clean_obsolete_cpusets, &tid) != 0) { + numad_log(LOG_CRIT, "pthread_create failure: cleaning thread\n"); exit(EXIT_FAILURE); } // Loop here forwever... @@ -2412,14 +2752,20 @@ int main(int argc, char *argv[]) { interval = manage_loads(); } sleep(interval); + if (got_sigterm | got_sigquit) { + shut_down_numad(); + } + if (got_sighup) { + got_sighup = 0; + close_log_file(); + open_log_file(); + } } if (pthread_attr_destroy(&attr) != 0) { numad_log(LOG_WARNING, "pthread_attr_destroy failure\n"); } pthread_mutex_destroy(&pid_list_mutex); pthread_mutex_destroy(&node_info_mutex); - } else { - shut_down_numad(); } exit(EXIT_SUCCESS); }