Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Add gpshrink to support elastic scaling #393

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,49 @@ jobs:
/code/gpdb_src/src/test/isolation/expected/
/code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/singlenodedir/demoDataDir-1/log/
icw-expandshrink-test:
needs: build
runs-on: [ self-hosted, example ]
env:
MAKE_TEST_COMMAND: "-C src/test/isolation2 installcheck-expandshrink"
TEST_OS: "centos"
DUMP_DB: "true"
steps:
- uses: actions/checkout@v3
with:
path: "gpdb_src"
- uses: actions/download-artifact@v3
with:
name: cbdb-variables
path: /opt/
- uses: actions/download-artifact@v3
with:
name: cbdb-package
path: /opt/
- name: Run icw-test script
run: |
mkdir /code
cp -a gpdb_src/ /code
cd /code
echo $GITHUB_RUN_ID > gpdb_src/BUILD_NUMBER
gpdb_src/hd-ci/icw_cbdb.bash $FTS_MODE
- uses: actions/upload-artifact@v3
if: failure()
with:
name: cbdb-icw-expandshrink-test-log
path: |
/code/gpdb_src/src/test/isolation2/regression.out
/code/gpdb_src/src/test/isolation2/regression.diffs
/code/gpdb_src/src/test/isolation2/results/
/code/gpdb_src/src/test/isolation2/expected/
/code/gpdb_src/gpAux/gpdemo/datadirs/standby/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/qddir/demoDataDir-1/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast1/demoDataDir0/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast2/demoDataDir1/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast3/demoDataDir2/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror1/demoDataDir0/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror2/demoDataDir1/log/
/code/gpdb_src/gpAux/gpdemo/datadirs/dbfast_mirror3/demoDataDir2/log/
icw-orca-test:
needs: build
runs-on: [self-hosted, example]
Expand Down
4 changes: 2 additions & 2 deletions gpMgmt/bin/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ SUBDIRS += ifaddrs
$(recurse)

PROGRAMS= analyzedb gpactivatestandby gpaddmirrors gpcheckcat gpcheckperf \
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpinitstandby \
gpcheckresgroupimpl gpconfig gpdeletesystem gpexpand gpshrink gpinitstandby \
gpinitsystem gpload gpload.py gplogfilter gpmovemirrors \
gppkg gprecoverseg gpreload gpscp gpsd gpssh gpssh-exkeys gpstart \
gpstate gpstop minirepro gpmemwatcher gpmemreport gpdemo
Expand Down Expand Up @@ -194,7 +194,7 @@ clean distclean:
rm -rf *.pyc
rm -f analyzedbc gpactivatestandbyc gpaddmirrorsc gpcheckcatc \
gpcheckperfc gpcheckresgroupimplc gpchecksubnetcfgc gpconfigc \
gpdeletesystemc gpexpandc gpinitstandbyc gplogfilterc gpmovemirrorsc \
gpdeletesystemc gpexpandc gpshrinkc gpinitstandbyc gplogfilterc gpmovemirrorsc \
gppkgc gprecoversegc gpreloadc gpscpc gpsdc gpssh-exkeysc gpsshc \
gpstartc gpstatec gpstopc minireproc
rm -f gpconfig_modules/gucs_disallowed_in_file.txt
30 changes: 30 additions & 0 deletions gpMgmt/bin/gpexpand
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,8 @@ class gpexpand:
tblspc_info = {}

for oid in tblspc_oids:
if oid not in tblspc_oid_names:
continue
location = os.path.dirname(os.readlink(os.path.join(coordinator_tblspc_dir,
oid)))
tblspc_info[oid] = {"location": location,
Expand Down Expand Up @@ -1222,6 +1224,15 @@ class gpexpand:
coordinator_tblspc_dir = self.gparray.coordinator.getSegmentTableSpaceDirectory()
if not os.listdir(coordinator_tblspc_dir):
return None

tblspc_oids = os.listdir(coordinator_tblspc_dir)
tblspc_oid_names = self.get_tablespace_oid_names()
flag = False
for oid in tblspc_oids:
if oid in tblspc_oid_names:
flag = True
if not flag:
return None

if not self.options.filename:
raise ExpansionError('Missing tablespace input file')
Expand Down Expand Up @@ -1385,6 +1396,25 @@ class gpexpand:
self.pool.join()
self.pool.check_results()


for i in range(1,12):
flag = True
for segment in newSegments:
if seg.isSegmentMirror() == True:
continue

cmd = Command('pg_isready for segment',
"pg_isready -q -h %s -p %d -d %s" % (segment.getSegmentHostName(), segment.getSegmentPort(), segment.getSegmentDataDirectory()))
cmd.run()
rc = cmd.get_return_code()
if rc != 0:
flag &= False
if flag:
break
time.sleep(10)
self.logger.info("Waiting for segment ready last for %s second" % (i*10))


"""
Build the list of delete statements based on the COORDINATOR_ONLY_TABLES
defined in gpcatalog.py
Expand Down
123 changes: 117 additions & 6 deletions gpMgmt/bin/gppylib/gparray.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def __equal(self, other, ignoreAttr=[]):
return True

def __eq__(self, other):
return self.__equal(other)
return self.__equal(other, ['mode'])


def __hash__(self):
Expand Down Expand Up @@ -429,6 +429,9 @@ def __str__(self):
return "(Primary: %s, Mirror: %s)" % (str(self.primaryDB),
str(self.mirrorDB))

def __eq__(self, other):
return self.primaryDB == other.primaryDB and self.mirrorDB == other.mirrorDB

# --------------------------------------------------------------------
def addPrimary(self,segDB):
self.primaryDB=segDB
Expand Down Expand Up @@ -799,6 +802,7 @@ def __init__(self, segments, segmentsAsLoadedFromDb=None):
self.standbyCoordinator = None
self.segmentPairs = []
self.expansionSegmentPairs=[]
self.shrinkSegmentPairs=[]
self.numPrimarySegments = 0

self.recoveredSegmentDbids = []
Expand Down Expand Up @@ -1045,7 +1049,7 @@ def dumpToFile(self, filename):
fp.close()

# --------------------------------------------------------------------
def getDbList(self, includeExpansionSegs=False):
def getDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""
Return a list of all Segment objects that make up the array
"""
Expand All @@ -1054,8 +1058,8 @@ def getDbList(self, includeExpansionSegs=False):
dbs.append(self.coordinator)
if self.standbyCoordinator:
dbs.append(self.standbyCoordinator)
if includeExpansionSegs:
dbs.extend(self.getSegDbList(True))
if includeExpansionSegs or removeShrinkSegs:
dbs.extend(self.getSegDbList(includeExpansionSegs, removeShrinkSegs))
else:
dbs.extend(self.getSegDbList())
return dbs
Expand Down Expand Up @@ -1105,23 +1109,29 @@ def getDbIdToPeerMap(self):


# --------------------------------------------------------------------
def getSegDbList(self, includeExpansionSegs=False):
def getSegDbList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""Return a list of all Segment objects for all segments in the array"""
dbs=[]
for segPair in self.segmentPairs:
dbs.extend(segPair.get_dbs())
if includeExpansionSegs:
for segPair in self.expansionSegmentPairs:
dbs.extend(segPair.get_dbs())
if removeShrinkSegs:
for segPair in self.shrinkSegmentPairs:
dbs = list(filter(lambda x: segPair.primaryDB != x and segPair.mirrorDB != x, dbs))
return dbs

# --------------------------------------------------------------------
def getSegmentList(self, includeExpansionSegs=False):
def getSegmentList(self, includeExpansionSegs=False, removeShrinkSegs=False):
"""Return a list of SegmentPair objects for all segments in the array"""
dbs=[]
dbs.extend(self.segmentPairs)
if includeExpansionSegs:
dbs.extend(self.expansionSegmentPairs)
if removeShrinkSegs:
for segPair in self.shrinkSegmentPairs:
dbs.remove(segPair)
return dbs

# --------------------------------------------------------------------
Expand All @@ -1148,6 +1158,21 @@ def getExpansionSegPairList(self):
"""Returns a list of all SegmentPair objects that make up the new segments
of an expansion"""
return self.expansionSegmentPairs

# --------------------------------------------------------------------
def getShrinkSegDbList(self):
"""Returns a list of all Segment objects that make up the new segments
of an expansion"""
dbs=[]
for segPair in self.shrinkSegmentPairs:
dbs.extend(segPair.get_dbs())
return dbs

# --------------------------------------------------------------------
def getShrinkSegPairList(self):
"""Returns a list of all SegmentPair objects that make up the new segments
of an expansion"""
return self.shrinkSegmentPairs

# --------------------------------------------------------------------
def getSegmentContainingDb(self, db):
Expand All @@ -1164,6 +1189,15 @@ def getExpansionSegmentContainingDb(self, db):
if db.getSegmentDbId() == segDb.getSegmentDbId():
return segPair
return None

# --------------------------------------------------------------------
def getShrinkSegmentContainingDb(self, db):
for segPair in self.shrinkSegmentPairs:
for segDb in segPair.get_dbs():
if db.getSegmentDbId() == segDb.getSegmentDbId():
return segPair
return None

# --------------------------------------------------------------------
def get_invalid_segdbs(self):
dbs=[]
Expand Down Expand Up @@ -1488,6 +1522,37 @@ def addExpansionSeg(self, content, preferred_role, dbid, role,
else:
seg.addMirror(segdb)

# --------------------------------------------------------------------
def addShrinkSeg(self, content, preferred_role, dbid, role,
hostname, address, port, datadir):
"""
Add a segment to the gparray as an shrink segment.

Note: may work better to construct the new Segment in gpshrink and
simply pass it in.
"""

segdb = Segment(content = content,
preferred_role = preferred_role,
dbid = dbid,
role = role,
mode = MODE_SYNCHRONIZED,
status = STATUS_UP,
hostname = hostname,
address = address,
port = port,
datadir = datadir)

if preferred_role == ROLE_PRIMARY:
self.shrinkSegmentPairs.append(SegmentPair())
seg = self.shrinkSegmentPairs[-1]
if seg.primaryDB:
raise Exception('Duplicate content id for primary segment')
seg.addPrimary(segdb)
else:
seg = self.shrinkSegmentPairs[-1]
seg.addMirror(segdb)

# --------------------------------------------------------------------
def reOrderExpansionSegs(self):
"""
Expand Down Expand Up @@ -1595,6 +1660,52 @@ def validateExpansionSegs(self):
else:
used_ports[hostname] = []
used_ports[hostname].append(db.port)

# --------------------------------------------------------------------
def validateShrinkSegs(self):
""" Checks the segments added for various inconsistencies and errors.
"""

# make sure we have added at least one segment
if len(self.shrinkSegmentPairs) == 0:
raise Exception('No shrink segments defined')

totalsize = len(self.segmentPairs)
removesize = len(self.shrinkSegmentPairs)

if removesize >= totalsize:
self.logger.error('removed segment num %d more than or equal to total segment num %d', removesize, totalsize)
exit(1)
elif removesize < 1:
self.logger.error('removed segment num %d less than 1', removesize)
exit(1)

for segPair in self.shrinkSegmentPairs:
if self.hasMirrors:
if segPair.mirrorDB is None:
raise Exception('primaryDB and mirrorDB should be removed simultaneously')

if segPair.primaryDB.content != segPair.mirrorDB.content:
raise Exception('primaryDB content is not equal mirrorDB content')

# If shrinkSegmentPairs not in the segmentPairs raise exception
flag = False
for segPair_ in self.segmentPairs :
if segPair_ == segPair :
flag = True

if flag == False:
raise Exception('Shrink segments not in the gp_segment_configuration table')

# If shrinkSegmentPairs is not the last n segment.
self.shrinkSegmentPairs.sort(key=lambda segPair: segPair.primaryDB.content)

if self.shrinkSegmentPairs[-1].primaryDB.content != self.get_max_contentid():
raise Exception('please remove segment from max contentid')

if self.shrinkSegmentPairs[0].primaryDB.content != self.get_max_contentid()-len(self.shrinkSegmentPairs)+1:
raise Exception('please remove segment in continuous contentid')


# --------------------------------------------------------------------
def addExpansionHosts(self, hosts, mirror_type):
Expand Down
2 changes: 1 addition & 1 deletion gpMgmt/bin/gppylib/system/ComputeCatalogUpdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, gpArray, forceMap, useUtilityMode, allowPrimary):
self.dbsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getSegmentsAsLoadedFromDb()])

# 'goalsegmap' reflects the desired state of the catalog
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True)])
self.goalsegmap = dict([(seg.getSegmentDbId(), seg) for seg in gpArray.getDbList(includeExpansionSegs=True, removeShrinkSegs=True)])

# find mirrors and primaries to remove
self.mirror_to_remove = [
Expand Down
Loading
Loading