803d1bd34c
Resolves: bz#1493085 bz#1518710 bz#1554255 bz#1558948 bz#1558989 Resolves: bz#1559452 bz#1567001 bz#1569312 bz#1569951 bz#1575539 Resolves: bz#1575557 bz#1577051 bz#1580120 bz#1581184 bz#1581553 Resolves: bz#1581647 bz#1582119 bz#1582129 bz#1582417 bz#1583047 Resolves: bz#1588408 bz#1592666 bz#1594658 Signed-off-by: Milind Changire <mchangir@redhat.com>
681 lines
23 KiB
Diff
681 lines
23 KiB
Diff
From d354eb1abb2160495e205c87e1b2ecd8778c70ed Mon Sep 17 00:00:00 2001
|
|
From: Kotresh HR <khiremat@redhat.com>
|
|
Date: Thu, 21 Sep 2017 18:11:15 -0400
|
|
Subject: [PATCH 292/305] geo-rep: Fix rename of directory in hybrid crawl
|
|
|
|
In hybrid crawl, renames and unlink can't be
|
|
synced but directory renames can be detected.
|
|
While syncing the directory on slave, if the
|
|
gfid already exists, it should be rename.
|
|
Hence if directory gfid already exists, rename
|
|
it.
|
|
|
|
Backport of:
|
|
> Patch: https://review.gluster.org/18448
|
|
> Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
|
|
> BUG: 1499566
|
|
> Signed-off-by: Kotresh HR <khiremat@redhat.com>
|
|
|
|
Change-Id: Ibf9f99e76a3e02795a3c2befd8cac48a5c365bb6
|
|
BUG: 1582417
|
|
Signed-off-by: Kotresh HR <khiremat@redhat.com>
|
|
Reviewed-on: https://code.engineering.redhat.com/gerrit/140285
|
|
Tested-by: RHGS Build Bot <nigelb@redhat.com>
|
|
Reviewed-by: Sunil Kumar Heggodu Gopala Acharya <sheggodu@redhat.com>
|
|
---
|
|
geo-replication/syncdaemon/gsyncd.py | 4 +-
|
|
geo-replication/syncdaemon/monitor.py | 85 +----------
|
|
geo-replication/syncdaemon/resource.py | 191 +++++--------------------
|
|
geo-replication/syncdaemon/syncdutils.py | 237 +++++++++++++++++++++++++++++++
|
|
4 files changed, 276 insertions(+), 241 deletions(-)
|
|
|
|
diff --git a/geo-replication/syncdaemon/gsyncd.py b/geo-replication/syncdaemon/gsyncd.py
|
|
index 629e8b7..b0ed0ae 100644
|
|
--- a/geo-replication/syncdaemon/gsyncd.py
|
|
+++ b/geo-replication/syncdaemon/gsyncd.py
|
|
@@ -39,7 +39,7 @@ from changelogagent import agent, Changelog
|
|
from gsyncdstatus import set_monitor_status, GeorepStatus, human_time_utc
|
|
from libcxattr import Xattr
|
|
import struct
|
|
-from syncdutils import get_master_and_slave_data_from_args, lf
|
|
+from syncdutils import get_master_and_slave_data_from_args, lf, Popen
|
|
|
|
ParseError = XET.ParseError if hasattr(XET, 'ParseError') else SyntaxError
|
|
|
|
@@ -778,7 +778,7 @@ def main_i():
|
|
else:
|
|
gconf.label = 'slave'
|
|
startup(go_daemon=go_daemon, log_file=log_file, label=gconf.label)
|
|
- resource.Popen.init_errhandler()
|
|
+ Popen.init_errhandler()
|
|
|
|
if be_agent:
|
|
os.setsid()
|
|
diff --git a/geo-replication/syncdaemon/monitor.py b/geo-replication/syncdaemon/monitor.py
|
|
index 0f43c4f..55f8330 100644
|
|
--- a/geo-replication/syncdaemon/monitor.py
|
|
+++ b/geo-replication/syncdaemon/monitor.py
|
|
@@ -16,7 +16,7 @@ import logging
|
|
import uuid
|
|
import xml.etree.ElementTree as XET
|
|
from subprocess import PIPE
|
|
-from resource import Popen, FILE, GLUSTER, SSH
|
|
+from resource import FILE, GLUSTER, SSH
|
|
from threading import Lock
|
|
from errno import ECHILD, ESRCH
|
|
import re
|
|
@@ -24,8 +24,9 @@ import random
|
|
from gconf import gconf
|
|
from syncdutils import select, waitpid, errno_wrap, lf
|
|
from syncdutils import set_term_handler, is_host_local, GsyncdError
|
|
-from syncdutils import escape, Thread, finalize, memoize, boolify
|
|
+from syncdutils import escape, Thread, finalize, boolify
|
|
from syncdutils import gf_event, EVENT_GEOREP_FAULTY
|
|
+from syncdutils import Volinfo, Popen
|
|
|
|
from gsyncdstatus import GeorepStatus, set_monitor_status
|
|
|
|
@@ -91,86 +92,6 @@ def get_slave_bricks_status(host, vol):
|
|
return list(up_hosts)
|
|
|
|
|
|
-class Volinfo(object):
|
|
-
|
|
- def __init__(self, vol, host='localhost', prelude=[]):
|
|
- po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
|
|
- 'volume', 'info', vol],
|
|
- stdout=PIPE, stderr=PIPE)
|
|
- vix = po.stdout.read()
|
|
- po.wait()
|
|
- po.terminate_geterr()
|
|
- vi = XET.fromstring(vix)
|
|
- if vi.find('opRet').text != '0':
|
|
- if prelude:
|
|
- via = '(via %s) ' % prelude.join(' ')
|
|
- else:
|
|
- via = ' '
|
|
- raise GsyncdError('getting volume info of %s%s '
|
|
- 'failed with errorcode %s' %
|
|
- (vol, via, vi.find('opErrno').text))
|
|
- self.tree = vi
|
|
- self.volume = vol
|
|
- self.host = host
|
|
-
|
|
- def get(self, elem):
|
|
- return self.tree.findall('.//' + elem)
|
|
-
|
|
- def is_tier(self):
|
|
- return (self.get('typeStr')[0].text == 'Tier')
|
|
-
|
|
- def is_hot(self, brickpath):
|
|
- logging.debug('brickpath: ' + repr(brickpath))
|
|
- return brickpath in self.hot_bricks
|
|
-
|
|
- @property
|
|
- @memoize
|
|
- def bricks(self):
|
|
- def bparse(b):
|
|
- host, dirp = b.find("name").text.split(':', 2)
|
|
- return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
|
|
- return [bparse(b) for b in self.get('brick')]
|
|
-
|
|
- @property
|
|
- @memoize
|
|
- def uuid(self):
|
|
- ids = self.get('id')
|
|
- if len(ids) != 1:
|
|
- raise GsyncdError("volume info of %s obtained from %s: "
|
|
- "ambiguous uuid" % (self.volume, self.host))
|
|
- return ids[0].text
|
|
-
|
|
- def replica_count(self, tier, hot):
|
|
- if (tier and hot):
|
|
- return int(self.get('hotBricks/hotreplicaCount')[0].text)
|
|
- elif (tier and not hot):
|
|
- return int(self.get('coldBricks/coldreplicaCount')[0].text)
|
|
- else:
|
|
- return int(self.get('replicaCount')[0].text)
|
|
-
|
|
- def disperse_count(self, tier, hot):
|
|
- if (tier and hot):
|
|
- # Tiering doesn't support disperse volume as hot brick,
|
|
- # hence no xml output, so returning 0. In case, if it's
|
|
- # supported later, we should change here.
|
|
- return 0
|
|
- elif (tier and not hot):
|
|
- return int(self.get('coldBricks/colddisperseCount')[0].text)
|
|
- else:
|
|
- return int(self.get('disperseCount')[0].text)
|
|
-
|
|
- @property
|
|
- @memoize
|
|
- def hot_bricks(self):
|
|
- return [b.text for b in self.get('hotBricks/brick')]
|
|
-
|
|
- def get_hot_bricks_count(self, tier):
|
|
- if (tier):
|
|
- return int(self.get('hotBricks/hotbrickCount')[0].text)
|
|
- else:
|
|
- return 0
|
|
-
|
|
-
|
|
class Monitor(object):
|
|
|
|
"""class which spawns and manages gsyncd workers"""
|
|
diff --git a/geo-replication/syncdaemon/resource.py b/geo-replication/syncdaemon/resource.py
|
|
index d6618c1..c4b5b53 100644
|
|
--- a/geo-replication/syncdaemon/resource.py
|
|
+++ b/geo-replication/syncdaemon/resource.py
|
|
@@ -13,21 +13,16 @@ import os
|
|
import sys
|
|
import stat
|
|
import time
|
|
-import signal
|
|
import fcntl
|
|
-import errno
|
|
import types
|
|
import struct
|
|
import socket
|
|
import logging
|
|
import tempfile
|
|
-import threading
|
|
import subprocess
|
|
import errno
|
|
from errno import EEXIST, ENOENT, ENODATA, ENOTDIR, ELOOP, EACCES
|
|
from errno import EISDIR, ENOTEMPTY, ESTALE, EINVAL, EBUSY, EPERM
|
|
-from select import error as SelectError
|
|
-import shutil
|
|
|
|
from gconf import gconf
|
|
import repce
|
|
@@ -43,7 +38,7 @@ from syncdutils import CHANGELOG_AGENT_CLIENT_VERSION
|
|
from syncdutils import GX_GFID_CANONICAL_LEN
|
|
from gsyncdstatus import GeorepStatus
|
|
from syncdutils import get_master_and_slave_data_from_args
|
|
-from syncdutils import lf
|
|
+from syncdutils import lf, Popen, sup, Volinfo
|
|
from syncdutils import Xattr, matching_disk_gfid, get_gfid_from_mnt
|
|
|
|
UrlRX = re.compile('\A(\w+)://([^ *?[]*)\Z')
|
|
@@ -52,14 +47,9 @@ UserRX = re.compile("[\w!\#$%&'*+-\/=?^_`{|}~]+")
|
|
|
|
ENOTSUP = getattr(errno, 'ENOTSUP', 'EOPNOTSUPP')
|
|
|
|
-def sup(x, *a, **kw):
|
|
- """a rubyesque "super" for python ;)
|
|
-
|
|
- invoke caller method in parent class with given args.
|
|
- """
|
|
- return getattr(super(type(x), x),
|
|
- sys._getframe(1).f_code.co_name)(*a, **kw)
|
|
-
|
|
+slv_volume = None
|
|
+slv_host = None
|
|
+slv_bricks = None
|
|
|
|
def desugar(ustr):
|
|
"""transform sugared url strings to standard <scheme>://<urlbody> form
|
|
@@ -114,149 +104,6 @@ def parse_url(ustr):
|
|
return getattr(this, sch.upper())(path)
|
|
|
|
|
|
-class Popen(subprocess.Popen):
|
|
-
|
|
- """customized subclass of subprocess.Popen with a ring
|
|
- buffer for children error output"""
|
|
-
|
|
- @classmethod
|
|
- def init_errhandler(cls):
|
|
- """start the thread which handles children's error output"""
|
|
- cls.errstore = {}
|
|
-
|
|
- def tailer():
|
|
- while True:
|
|
- errstore = cls.errstore.copy()
|
|
- try:
|
|
- poe, _, _ = select(
|
|
- [po.stderr for po in errstore], [], [], 1)
|
|
- except (ValueError, SelectError):
|
|
- # stderr is already closed wait for some time before
|
|
- # checking next error
|
|
- time.sleep(0.5)
|
|
- continue
|
|
- for po in errstore:
|
|
- if po.stderr not in poe:
|
|
- continue
|
|
- po.lock.acquire()
|
|
- try:
|
|
- if po.on_death_row:
|
|
- continue
|
|
- la = errstore[po]
|
|
- try:
|
|
- fd = po.stderr.fileno()
|
|
- except ValueError: # file is already closed
|
|
- time.sleep(0.5)
|
|
- continue
|
|
-
|
|
- try:
|
|
- l = os.read(fd, 1024)
|
|
- except OSError:
|
|
- time.sleep(0.5)
|
|
- continue
|
|
-
|
|
- if not l:
|
|
- continue
|
|
- tots = len(l)
|
|
- for lx in la:
|
|
- tots += len(lx)
|
|
- while tots > 1 << 20 and la:
|
|
- tots -= len(la.pop(0))
|
|
- la.append(l)
|
|
- finally:
|
|
- po.lock.release()
|
|
- t = syncdutils.Thread(target=tailer)
|
|
- t.start()
|
|
- cls.errhandler = t
|
|
-
|
|
- @classmethod
|
|
- def fork(cls):
|
|
- """fork wrapper that restarts errhandler thread in child"""
|
|
- pid = os.fork()
|
|
- if not pid:
|
|
- cls.init_errhandler()
|
|
- return pid
|
|
-
|
|
- def __init__(self, args, *a, **kw):
|
|
- """customizations for subprocess.Popen instantiation
|
|
-
|
|
- - 'close_fds' is taken to be the default
|
|
- - if child's stderr is chosen to be managed,
|
|
- register it with the error handler thread
|
|
- """
|
|
- self.args = args
|
|
- if 'close_fds' not in kw:
|
|
- kw['close_fds'] = True
|
|
- self.lock = threading.Lock()
|
|
- self.on_death_row = False
|
|
- self.elines = []
|
|
- try:
|
|
- sup(self, args, *a, **kw)
|
|
- except:
|
|
- ex = sys.exc_info()[1]
|
|
- if not isinstance(ex, OSError):
|
|
- raise
|
|
- raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
|
|
- (args[0], errno.errorcode[ex.errno],
|
|
- os.strerror(ex.errno)))
|
|
- if kw.get('stderr') == subprocess.PIPE:
|
|
- assert(getattr(self, 'errhandler', None))
|
|
- self.errstore[self] = []
|
|
-
|
|
- def errlog(self):
|
|
- """make a log about child's failure event"""
|
|
- logging.error(lf("command returned error",
|
|
- cmd=" ".join(self.args),
|
|
- error=self.returncode))
|
|
- lp = ''
|
|
-
|
|
- def logerr(l):
|
|
- logging.error(self.args[0] + "> " + l)
|
|
- for l in self.elines:
|
|
- ls = l.split('\n')
|
|
- ls[0] = lp + ls[0]
|
|
- lp = ls.pop()
|
|
- for ll in ls:
|
|
- logerr(ll)
|
|
- if lp:
|
|
- logerr(lp)
|
|
-
|
|
- def errfail(self):
|
|
- """fail nicely if child did not terminate with success"""
|
|
- self.errlog()
|
|
- syncdutils.finalize(exval=1)
|
|
-
|
|
- def terminate_geterr(self, fail_on_err=True):
|
|
- """kill child, finalize stderr harvesting (unregister
|
|
- from errhandler, set up .elines), fail on error if
|
|
- asked for
|
|
- """
|
|
- self.lock.acquire()
|
|
- try:
|
|
- self.on_death_row = True
|
|
- finally:
|
|
- self.lock.release()
|
|
- elines = self.errstore.pop(self)
|
|
- if self.poll() is None:
|
|
- self.terminate()
|
|
- if self.poll() is None:
|
|
- time.sleep(0.1)
|
|
- self.kill()
|
|
- self.wait()
|
|
- while True:
|
|
- if not select([self.stderr], [], [], 0.1)[0]:
|
|
- break
|
|
- b = os.read(self.stderr.fileno(), 1024)
|
|
- if b:
|
|
- elines.append(b)
|
|
- else:
|
|
- break
|
|
- self.stderr.close()
|
|
- self.elines = elines
|
|
- if fail_on_err and self.returncode != 0:
|
|
- self.errfail()
|
|
-
|
|
-
|
|
class Server(object):
|
|
|
|
"""singleton implemening those filesystem access primitives
|
|
@@ -776,6 +623,31 @@ class Server(object):
|
|
if isinstance(st, int):
|
|
blob = entry_pack_mkdir(
|
|
gfid, bname, e['mode'], e['uid'], e['gid'])
|
|
+ else:
|
|
+ # If gfid of a directory exists on slave but path based
|
|
+ # create is getting EEXIST. This means the directory is
|
|
+ # renamed in master but recorded as MKDIR during hybrid
|
|
+ # crawl. Get the directory path by reading the backend
|
|
+ # symlink and trying to rename to new name as said by
|
|
+ # master.
|
|
+ global slv_bricks
|
|
+ global slv_volume
|
|
+ global slv_host
|
|
+ if not slv_bricks:
|
|
+ slv_info = Volinfo (slv_volume, slv_host)
|
|
+ slv_bricks = slv_info.bricks
|
|
+ # Result of readlink would be of format as below.
|
|
+ # readlink = "../../pgfid[0:2]/pgfid[2:4]/pgfid/basename"
|
|
+ realpath = os.readlink(os.path.join(slv_bricks[0]['dir'],
|
|
+ ".glusterfs", gfid[0:2],
|
|
+ gfid[2:4], gfid))
|
|
+ realpath_parts = realpath.split('/')
|
|
+ src_pargfid = realpath_parts[-2]
|
|
+ src_basename = realpath_parts[-1]
|
|
+ src_entry = os.path.join(pfx, src_pargfid, src_basename)
|
|
+ logging.info(lf("Special case: rename on mkdir",
|
|
+ gfid=gfid, entry=repr(entry)))
|
|
+ rename_with_disk_gfid_confirmation(gfid, src_entry, entry)
|
|
elif op == 'LINK':
|
|
slink = os.path.join(pfx, gfid)
|
|
st = lstat(slink)
|
|
@@ -1318,6 +1190,11 @@ class GLUSTER(AbstractUrl, SlaveLocal, SlaveRemote):
|
|
def __init__(self, path):
|
|
self.host, self.volume = sup(self, path, '^(%s):(.+)' % HostRX.pattern)
|
|
|
|
+ global slv_volume
|
|
+ global slv_host
|
|
+ slv_volume = self.volume
|
|
+ slv_host = self.host
|
|
+
|
|
def canonical_path(self):
|
|
return ':'.join([gethostbyname(self.host), self.volume])
|
|
|
|
diff --git a/geo-replication/syncdaemon/syncdutils.py b/geo-replication/syncdaemon/syncdutils.py
|
|
index 2b57f83..a493c37 100644
|
|
--- a/geo-replication/syncdaemon/syncdutils.py
|
|
+++ b/geo-replication/syncdaemon/syncdutils.py
|
|
@@ -16,13 +16,18 @@ import fcntl
|
|
import shutil
|
|
import logging
|
|
import socket
|
|
+import errno
|
|
+import threading
|
|
import subprocess
|
|
+from subprocess import PIPE
|
|
from threading import Lock, Thread as baseThread
|
|
from errno import EACCES, EAGAIN, EPIPE, ENOTCONN, ECONNABORTED
|
|
from errno import EINTR, ENOENT, EPERM, ESTALE, EBUSY, errorcode
|
|
from signal import signal, SIGTERM
|
|
import select as oselect
|
|
from os import waitpid as owaitpid
|
|
+import xml.etree.ElementTree as XET
|
|
+from select import error as SelectError
|
|
|
|
from conf import GLUSTERFS_LIBEXECDIR, UUID_FILE
|
|
sys.path.insert(1, GLUSTERFS_LIBEXECDIR)
|
|
@@ -76,6 +81,15 @@ NEWLINE_ESCAPE_CHAR = "%0A"
|
|
PERCENTAGE_ESCAPE_CHAR = "%25"
|
|
|
|
|
|
+def sup(x, *a, **kw):
|
|
+ """a rubyesque "super" for python ;)
|
|
+
|
|
+ invoke caller method in parent class with given args.
|
|
+ """
|
|
+ return getattr(super(type(x), x),
|
|
+ sys._getframe(1).f_code.co_name)(*a, **kw)
|
|
+
|
|
+
|
|
def escape(s):
|
|
"""the chosen flavor of string escaping, used all over
|
|
to turn whatever data to creatable representation"""
|
|
@@ -650,3 +664,226 @@ def lf(event, **kwargs):
|
|
for k, v in kwargs.items():
|
|
msg += "\t{0}={1}".format(k, v)
|
|
return msg
|
|
+
|
|
+
|
|
+class Popen(subprocess.Popen):
|
|
+
|
|
+ """customized subclass of subprocess.Popen with a ring
|
|
+ buffer for children error output"""
|
|
+
|
|
+ @classmethod
|
|
+ def init_errhandler(cls):
|
|
+ """start the thread which handles children's error output"""
|
|
+ cls.errstore = {}
|
|
+
|
|
+ def tailer():
|
|
+ while True:
|
|
+ errstore = cls.errstore.copy()
|
|
+ try:
|
|
+ poe, _, _ = select(
|
|
+ [po.stderr for po in errstore], [], [], 1)
|
|
+ except (ValueError, SelectError):
|
|
+ # stderr is already closed wait for some time before
|
|
+ # checking next error
|
|
+ time.sleep(0.5)
|
|
+ continue
|
|
+ for po in errstore:
|
|
+ if po.stderr not in poe:
|
|
+ continue
|
|
+ po.lock.acquire()
|
|
+ try:
|
|
+ if po.on_death_row:
|
|
+ continue
|
|
+ la = errstore[po]
|
|
+ try:
|
|
+ fd = po.stderr.fileno()
|
|
+ except ValueError: # file is already closed
|
|
+ time.sleep(0.5)
|
|
+ continue
|
|
+
|
|
+ try:
|
|
+ l = os.read(fd, 1024)
|
|
+ except OSError:
|
|
+ time.sleep(0.5)
|
|
+ continue
|
|
+
|
|
+ if not l:
|
|
+ continue
|
|
+ tots = len(l)
|
|
+ for lx in la:
|
|
+ tots += len(lx)
|
|
+ while tots > 1 << 20 and la:
|
|
+ tots -= len(la.pop(0))
|
|
+ la.append(l)
|
|
+ finally:
|
|
+ po.lock.release()
|
|
+ t = Thread(target=tailer)
|
|
+ t.start()
|
|
+ cls.errhandler = t
|
|
+
|
|
+ @classmethod
|
|
+ def fork(cls):
|
|
+ """fork wrapper that restarts errhandler thread in child"""
|
|
+ pid = os.fork()
|
|
+ if not pid:
|
|
+ cls.init_errhandler()
|
|
+ return pid
|
|
+
|
|
+ def __init__(self, args, *a, **kw):
|
|
+ """customizations for subprocess.Popen instantiation
|
|
+
|
|
+ - 'close_fds' is taken to be the default
|
|
+ - if child's stderr is chosen to be managed,
|
|
+ register it with the error handler thread
|
|
+ """
|
|
+ self.args = args
|
|
+ if 'close_fds' not in kw:
|
|
+ kw['close_fds'] = True
|
|
+ self.lock = threading.Lock()
|
|
+ self.on_death_row = False
|
|
+ self.elines = []
|
|
+ try:
|
|
+ sup(self, args, *a, **kw)
|
|
+ except:
|
|
+ ex = sys.exc_info()[1]
|
|
+ if not isinstance(ex, OSError):
|
|
+ raise
|
|
+ raise GsyncdError("""execution of "%s" failed with %s (%s)""" %
|
|
+ (args[0], errno.errorcode[ex.errno],
|
|
+ os.strerror(ex.errno)))
|
|
+ if kw.get('stderr') == subprocess.PIPE:
|
|
+ assert(getattr(self, 'errhandler', None))
|
|
+ self.errstore[self] = []
|
|
+
|
|
+ def errlog(self):
|
|
+ """make a log about child's failure event"""
|
|
+ logging.error(lf("command returned error",
|
|
+ cmd=" ".join(self.args),
|
|
+ error=self.returncode))
|
|
+ lp = ''
|
|
+
|
|
+ def logerr(l):
|
|
+ logging.error(self.args[0] + "> " + l)
|
|
+ for l in self.elines:
|
|
+ ls = l.split('\n')
|
|
+ ls[0] = lp + ls[0]
|
|
+ lp = ls.pop()
|
|
+ for ll in ls:
|
|
+ logerr(ll)
|
|
+ if lp:
|
|
+ logerr(lp)
|
|
+
|
|
+ def errfail(self):
|
|
+ """fail nicely if child did not terminate with success"""
|
|
+ self.errlog()
|
|
+ finalize(exval=1)
|
|
+
|
|
+ def terminate_geterr(self, fail_on_err=True):
|
|
+ """kill child, finalize stderr harvesting (unregister
|
|
+ from errhandler, set up .elines), fail on error if
|
|
+ asked for
|
|
+ """
|
|
+ self.lock.acquire()
|
|
+ try:
|
|
+ self.on_death_row = True
|
|
+ finally:
|
|
+ self.lock.release()
|
|
+ elines = self.errstore.pop(self)
|
|
+ if self.poll() is None:
|
|
+ self.terminate()
|
|
+ if self.poll() is None:
|
|
+ time.sleep(0.1)
|
|
+ self.kill()
|
|
+ self.wait()
|
|
+ while True:
|
|
+ if not select([self.stderr], [], [], 0.1)[0]:
|
|
+ break
|
|
+ b = os.read(self.stderr.fileno(), 1024)
|
|
+ if b:
|
|
+ elines.append(b)
|
|
+ else:
|
|
+ break
|
|
+ self.stderr.close()
|
|
+ self.elines = elines
|
|
+ if fail_on_err and self.returncode != 0:
|
|
+ self.errfail()
|
|
+
|
|
+
|
|
+class Volinfo(object):
|
|
+
|
|
+ def __init__(self, vol, host='localhost', prelude=[]):
|
|
+ po = Popen(prelude + ['gluster', '--xml', '--remote-host=' + host,
|
|
+ 'volume', 'info', vol],
|
|
+ stdout=PIPE, stderr=PIPE)
|
|
+ vix = po.stdout.read()
|
|
+ po.wait()
|
|
+ po.terminate_geterr()
|
|
+ vi = XET.fromstring(vix)
|
|
+ if vi.find('opRet').text != '0':
|
|
+ if prelude:
|
|
+ via = '(via %s) ' % prelude.join(' ')
|
|
+ else:
|
|
+ via = ' '
|
|
+ raise GsyncdError('getting volume info of %s%s '
|
|
+ 'failed with errorcode %s' %
|
|
+ (vol, via, vi.find('opErrno').text))
|
|
+ self.tree = vi
|
|
+ self.volume = vol
|
|
+ self.host = host
|
|
+
|
|
+ def get(self, elem):
|
|
+ return self.tree.findall('.//' + elem)
|
|
+
|
|
+ def is_tier(self):
|
|
+ return (self.get('typeStr')[0].text == 'Tier')
|
|
+
|
|
+ def is_hot(self, brickpath):
|
|
+ logging.debug('brickpath: ' + repr(brickpath))
|
|
+ return brickpath in self.hot_bricks
|
|
+
|
|
+ @property
|
|
+ @memoize
|
|
+ def bricks(self):
|
|
+ def bparse(b):
|
|
+ host, dirp = b.find("name").text.split(':', 2)
|
|
+ return {'host': host, 'dir': dirp, 'uuid': b.find("hostUuid").text}
|
|
+ return [bparse(b) for b in self.get('brick')]
|
|
+
|
|
+ @property
|
|
+ @memoize
|
|
+ def uuid(self):
|
|
+ ids = self.get('id')
|
|
+ if len(ids) != 1:
|
|
+ raise GsyncdError("volume info of %s obtained from %s: "
|
|
+ "ambiguous uuid" % (self.volume, self.host))
|
|
+ return ids[0].text
|
|
+
|
|
+ def replica_count(self, tier, hot):
|
|
+ if (tier and hot):
|
|
+ return int(self.get('hotBricks/hotreplicaCount')[0].text)
|
|
+ elif (tier and not hot):
|
|
+ return int(self.get('coldBricks/coldreplicaCount')[0].text)
|
|
+ else:
|
|
+ return int(self.get('replicaCount')[0].text)
|
|
+
|
|
+ def disperse_count(self, tier, hot):
|
|
+ if (tier and hot):
|
|
+ # Tiering doesn't support disperse volume as hot brick,
|
|
+ # hence no xml output, so returning 0. In case, if it's
|
|
+ # supported later, we should change here.
|
|
+ return 0
|
|
+ elif (tier and not hot):
|
|
+ return int(self.get('coldBricks/colddisperseCount')[0].text)
|
|
+ else:
|
|
+ return int(self.get('disperseCount')[0].text)
|
|
+
|
|
+ @property
|
|
+ @memoize
|
|
+ def hot_bricks(self):
|
|
+ return [b.text for b in self.get('hotBricks/brick')]
|
|
+
|
|
+ def get_hot_bricks_count(self, tier):
|
|
+ if (tier):
|
|
+ return int(self.get('hotBricks/hotbrickCount')[0].text)
|
|
+ else:
|
|
+ return 0
|
|
--
|
|
1.8.3.1
|
|
|