Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

统一 ps 开发 - python #39431

Merged
merged 74 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
b3a7e46
delete gloo connect retry
ziyoujiyi Nov 26, 2021
376e524
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Jan 6, 2022
588b770
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Jan 7, 2022
a3e3c3e
the_one_ps dirs reconstruct
ziyoujiyi Jan 7, 2022
45fdb68
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Jan 7, 2022
bc724bc
.
ziyoujiyi Jan 7, 2022
01fb13b
Merge branch 'develop' of https://github.com/ziyoujiyi/Paddle into de…
ziyoujiyi Jan 7, 2022
1944ad4
.
ziyoujiyi Jan 7, 2022
b86b48c
create the_one_ps dirs
ziyoujiyi Jan 11, 2022
628f1cf
create the_one_ps dirs
ziyoujiyi Jan 11, 2022
d32b2bb
create the_one_ps dirs
ziyoujiyi Jan 11, 2022
f6df7c8
create the_one_ps dirs
ziyoujiyi Jan 11, 2022
1709843
create the_one_ps dirs
ziyoujiyi Jan 11, 2022
f389f62
create the_one_ps dirs
ziyoujiyi Jan 11, 2022
be6bded
the one ps dirs modify
ziyoujiyi Jan 12, 2022
bf2c7fd
the one ps dirs modify
ziyoujiyi Jan 12, 2022
bbe72dc
the one ps dirs modify
ziyoujiyi Jan 12, 2022
bcf4ff3
the one ps dirs modify
ziyoujiyi Jan 12, 2022
428b5ca
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Jan 12, 2022
dc18879
refactor ps optimize
ziyoujiyi Jan 15, 2022
9ed9f54
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Jan 15, 2022
c91b536
refactor ps optimize
ziyoujiyi Jan 18, 2022
491fcd7
refactor ps optimize
ziyoujiyi Jan 18, 2022
a4fa15d
.
ziyoujiyi Jan 19, 2022
a9fedf5
.
ziyoujiyi Jan 19, 2022
1b29ba7
.
ziyoujiyi Jan 19, 2022
333d4d8
.
ziyoujiyi Jan 19, 2022
12902d1
.
ziyoujiyi Jan 19, 2022
a5465e4
.
ziyoujiyi Jan 19, 2022
923940f
refactor theoneps
ziyoujiyi Jan 20, 2022
6afa54b
the_one_ps
ziyoujiyi Jan 20, 2022
3937605
add ps pass unittest
ziyoujiyi Jan 24, 2022
92ac86d
add ps pass unittest
ziyoujiyi Jan 24, 2022
2bff417
ps unitest frame
ziyoujiyi Jan 25, 2022
96a9750
ps unittest frame
ziyoujiyi Jan 25, 2022
4094e8f
ps unittest frame
ziyoujiyi Jan 25, 2022
748376d
ps unittest frame
ziyoujiyi Jan 25, 2022
3591c16
ps unittest frame
ziyoujiyi Jan 25, 2022
df76b76
ps unittest frame
ziyoujiyi Jan 25, 2022
a5300f5
ps unittest frame
ziyoujiyi Jan 25, 2022
c76d79d
ps unittest frame
ziyoujiyi Jan 25, 2022
48052f3
ps unittest frame
ziyoujiyi Jan 25, 2022
3d8948c
ps unittest frame
ziyoujiyi Jan 25, 2022
dd520ff
ps unittest frame
ziyoujiyi Jan 25, 2022
24d97fb
ps unittest frame
ziyoujiyi Jan 25, 2022
15e333d
ps unittest frame
ziyoujiyi Jan 25, 2022
95a08d7
ps unittest frame
ziyoujiyi Jan 25, 2022
89c9ba7
ps unittest frame
ziyoujiyi Jan 25, 2022
50d2757
ps unittest frame
ziyoujiyi Jan 25, 2022
c50bb64
ps unittest frame
ziyoujiyi Jan 25, 2022
853d28c
add cpu_async_ps_mode test
ziyoujiyi Jan 25, 2022
11379c8
add cpu_async_ps_mode test
ziyoujiyi Jan 25, 2022
109db14
add cpu_async_ps_mode test
ziyoujiyi Jan 25, 2022
83aac71
ps unittest ready
ziyoujiyi Jan 26, 2022
1402ab2
ps unittest ready
ziyoujiyi Jan 26, 2022
25028e5
solve dist_pass init conflict
ziyoujiyi Jan 26, 2022
a0e7fb4
Merge branch 'develop' of https://github.com/ziyoujiyi/Paddle into de…
ziyoujiyi Jan 26, 2022
4fa51fe
solve import CommContext error
ziyoujiyi Jan 27, 2022
e9697ca
unittest ok
ziyoujiyi Jan 27, 2022
789b94b
implement AllocateFrom
Jan 27, 2022
cba9308
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Jan 27, 2022
1c5489b
Merge commit 'refs/pull/39280/head' of https://github.com/PaddlePaddl…
ziyoujiyi Jan 27, 2022
edf28f2
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Jan 27, 2022
6f22cb3
solve setup.py.in conflict
ziyoujiyi Jan 28, 2022
b6926f8
solve conflict
ziyoujiyi Jan 28, 2022
ef2ebaf
Merge branch 'develop' of https://github.com/ziyoujiyi/Paddle into de…
ziyoujiyi Jan 28, 2022
085b6e8
solve conflict
ziyoujiyi Jan 28, 2022
7695ddb
solve conflict
ziyoujiyi Jan 28, 2022
b006e69
.
ziyoujiyi Jan 28, 2022
87f8a33
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
ziyoujiyi Jan 28, 2022
7bf5cc1
.
ziyoujiyi Jan 28, 2022
ec83b06
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Feb 8, 2022
b06685f
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Feb 8, 2022
8a56cd4
cpu-async-ps minimize test ok & gpu minimize test ok
ziyoujiyi Feb 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 29 additions & 32 deletions python/paddle/distributed/fleet/meta_optimizers/ps_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self, optimizer):
self.inner_opt = optimizer
# we do not allow meta optimizer to be inner optimizer currently
self.meta_optimizers_white_list = []
self.attrs = {}
self.pass_ctx = PassContext()

def _set_basic_info(self, loss, role_maker, user_defined_optimizer,
Expand All @@ -40,50 +39,48 @@ def _set_basic_info(self, loss, role_maker, user_defined_optimizer,
loss, role_maker, user_defined_optimizer, user_defined_strategy)

def _init_ps_pass_context(self, loss, startup_program):
attrs = {}
# trainer
self.attrs["env"] = get_dist_env()
attrs["env"] = get_dist_env()

self.attrs['loss'] = loss
self.attrs['min_block_size'] = 81920
self.attrs['origin_main_program'] = loss.block.program
self.attrs['origin_startup_program'] = startup_program
attrs['loss'] = loss
attrs['min_block_size'] = 81920
attrs['origin_main_program'] = loss.block.program
attrs['origin_startup_program'] = startup_program

self.attrs['cloned_main'] = loss.block.program.clone()
self.attrs['cloned_startup'] = startup_program.clone()
attrs['cloned_main'] = attrs['origin_main_program'].clone()
attrs['cloned_startup'] = attrs['origin_startup_program'].clone()

self.attrs['user_defined_strategy'] = self.user_defined_strategy
self.attrs['trainer'] = TrainerRuntimeConfig(self.user_defined_strategy)
self.attrs['ps_mode'] = self.attrs['trainer'].mode
attrs['user_defined_strategy'] = self.user_defined_strategy
attrs['trainer'] = TrainerRuntimeConfig(self.user_defined_strategy)
attrs['ps_mode'] = attrs['trainer'].mode

self.attrs['role_maker'] = self.role_maker
self.attrs[
attrs['role_maker'] = self.role_maker
attrs[
'is_heter_ps_mode'] = self.role_maker._is_heter_parameter_server_mode
self.attrs['is_worker'] = self.role_maker._is_worker()
self.attrs['is_server'] = self.role_maker._is_server()
self.attrs['is_heter_worker'] = self.role_maker._is_heter_worker()
attrs['is_worker'] = self.role_maker._is_worker()
attrs['is_server'] = self.role_maker._is_server()
attrs['is_heter_worker'] = self.role_maker._is_heter_worker()

self.attrs['use_ps_gpu'] = self.user_defined_strategy.a_sync_configs[
attrs['use_ps_gpu'] = self.user_defined_strategy.a_sync_configs[
"use_ps_gpu"]
self.attrs[
'lr_decay_steps'] = self.user_defined_strategy.a_sync_configs[
"lr_decay_steps"]
self.attrs['k_steps'] = self.user_defined_strategy.a_sync_configs[
"k_steps"]
self.attrs[
'launch_barrier'] = self.user_defined_strategy.a_sync_configs[
"launch_barrier"]

self.attrs['launch_barrier_flag'] = int(
attrs['lr_decay_steps'] = self.user_defined_strategy.a_sync_configs[
"lr_decay_steps"]
attrs['k_steps'] = self.user_defined_strategy.a_sync_configs["k_steps"]
attrs['launch_barrier'] = self.user_defined_strategy.a_sync_configs[
"launch_barrier"]

attrs['launch_barrier_flag'] = int(
os.getenv("FLAGS_LAUNCH_BARRIER", "1"))

build_var_distributed(self.attrs)
build_var_distributed(attrs)

# server
self.attrs['_main_server'] = fluid.Program()
self.attrs['_startup_server'] = fluid.Program()
self.attrs['tensor_table'] = {}
attrs['_main_server'] = fluid.Program()
attrs['_startup_server'] = fluid.Program()
attrs['tensor_table'] = {}

self.pass_ctx._attrs = self.attrs
self.pass_ctx._attrs = attrs

def _is_graph_out(self):
return False
Expand Down
8 changes: 4 additions & 4 deletions python/paddle/distributed/passes/ps_server_pass.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ def _apply_single_impl(self, main_program, startup_program, pass_ctx):
LRScheduler), "must be LRScheduler"

ops = get_optimize_ops(attrs['origin_main_program'])
lr_decay_main_program, lr_decay_startup_program, lr_name = _get_lr_sheduler_program(
lr_decay_main_program, lr_decay_startup_program, lr_name = self._get_lr_sheduler_program(
attrs['origin_main_program'].lr_sheduler, attrs['lr_decay_steps'])
_add_tensor_table(attrs, "@LR_DECAY_COUNTER@", lr_name,
lr_decay_startup_program, lr_decay_main_program,
"GlobalStepTable")
self._add_tensor_table(attrs, "@LR_DECAY_COUNTER@", lr_name,
lr_decay_startup_program, lr_decay_main_program,
"GlobalStepTable")
return


Expand Down
15 changes: 12 additions & 3 deletions python/paddle/distributed/ps/utils/ps_program_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _optimize_programs(self):
pass

def _build_trainer_programs(self):
pass
raise NotImplementedError

def _build_pserver_programs(self):
is_sgd_adam = False
Expand All @@ -60,11 +60,13 @@ def _build_pserver_programs(self):

def _build_programs(self):
if self.attrs['is_worker']:
logger.info("start building trainer program")
self._build_trainer_programs()
fluid.framework.switch_startup_program(self.cloned_startup)
self.loss.block.program = self.cloned_main

elif self.attrs['is_server']:
logger.info("start building pserver program")
self._build_pserver_programs()
self.loss.block.program = self.attrs['_main_server']
fluid.framework.switch_startup_program(self.attrs[
Expand All @@ -73,6 +75,7 @@ def _build_programs(self):

class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式
def __init__(self, pass_ctx):
logger.info("start building geo-ps program")
super(GeoPsProgramBuilder, self).__init__(pass_ctx)
if self.ps_mode != DistributedMode.GEO:
raise ValueError("ps mode: {} not matched {}",
Expand All @@ -92,6 +95,7 @@ def _build_trainer_programs(self):

class CpuSyncPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
logger.info("start building cpu-sync-ps program")
super(CpuSyncPsProgramBuilder, self).__init__(pass_ctx)
if self.ps_mode == DistributedMode.GEO:
raise ValueError("ps mode: {} not matched {}",
Expand Down Expand Up @@ -130,14 +134,17 @@ def _build_trainer_programs(self):

class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder):
def __init__(self, pass_ctx):
logger.info("start building cpu-async-ps program")
super(CpuAsyncPsProgramBuilder, self).__init__(pass_ctx)


class GpuPsProgramBuilder(PsProgramBuilder): # 和 geo、sync、async 等模式无关
class GpuPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
logger.info("start building gpu-ps program")
super(GpuPsProgramBuilder, self).__init__(pass_ctx)

def _build_trainer_programs(self):

add_lr_decay_table_pass = new_pass("add_lr_decay_table_pass",
self.attrs)
add_lr_decay_table_pass.apply([], [], self.pass_ctx)
Expand All @@ -152,7 +159,8 @@ def _build_trainer_programs(self):
ps_gpu_pass.apply([self.cloned_main], [None], self.pass_ctx)

ps_transpile_pass = new_pass("ps_transpile_pass", self.attrs)
ps_transpile_pass.apply([_main], [_startup], self.pass_ctx)
ps_transpile_pass.apply([self.cloned_main], [self.cloned_startup],
self.pass_ctx)

self.attrs['origin_main_program'] = self.cloned_main
self.attrs['origin_startup_program'] = self.cloned_startup
Expand All @@ -165,6 +173,7 @@ def _build_trainer_programs(self):

class HeterAsyncPsProgramBuilder(PsProgramBuilder):
def __init__(self, pass_ctx):
logger.info("start building heter-async-ps program")
super(HeterAsyncPsProgramBuilder, self).__init__(pass_ctx)
if self.use_ps_gpu or self.ps_mode == DistributedMode.GEO or self.attrs[
'is_heter_ps_mode'] == False:
Expand Down
22 changes: 22 additions & 0 deletions python/paddle/distributed/ps/utils/public.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import paddle.fluid.framework as framework
import paddle.distributed.fleet as fleet

#logging.basicConfig(
# format='%(levelname)s - %(asctime)s - %(pathname)s: %(lineno)s - %(message)s', level=logging.INFO)
#logger = logging.getLogger(__name__)

OP_NAME_SCOPE = "op_namescope"
CLIP_OP_NAME_SCOPE = "gradient_clip"
STEP_COUNTER = "@PS_STEP_COUNTER@"
Expand All @@ -43,6 +47,24 @@
SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"}


def logger_config(log_path, logging_name):
logger = logging.getLogger(logging_name)
logger.setLevel(level=logging.DEBUG)
handler = logging.FileHandler(log_path, mode='a', encoding='UTF-8')
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(levelname)s - %(asctime)s - %(pathname)s: %(lineno)s - %(message)s')
handler.setFormatter(formatter)
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
logger.addHandler(handler)
logger.addHandler(console)
return logger


logger = logger_config(log_path='/ps_log', logging_name='ps_log')


class DistributedMode:
SYNC = 0
ASYNC = 1
Expand Down
56 changes: 42 additions & 14 deletions python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,48 @@ def setUp(self):
def tearDown(self):
print('Ps tearDown...')

def ps_launch(self, config):
cmd = [
sys.executable,
"-u",
] + [
"-m", "launch", "--log_dir", config['log_dir'], "--worker_num",
config['worker_num'], "--server_num", config['server_num'],
"../ps/ps_dnn_trainer.py", "-m", config['ps_mode_config'],
"--run_minimize", config['run_minimize'], "--run_single_pass",
config['run_single_pass'], "--debug_new_pass",
config['debug_new_pass'], "--debug_new_minimize",
config['debug_new_minimize'], "--applied_pass_name",
config['applied_pass_name']
]
def ps_launch(self, config, ps_mode="cpu-ps"):
if ps_mode == "cpu-ps":
os.environ['WITH_DISTRIBUTE'] = 'ON'

cmd = [
sys.executable,
"-u",
] + [
"-m", "launch", "--log_dir", config['log_dir'], "--worker_num",
config['worker_num'], "--server_num", config['server_num'],
"../ps/ps_dnn_trainer.py", "-m", config['ps_mode_config'],
"--run_minimize", config['run_minimize'], "--run_single_pass",
config['run_single_pass'], "--debug_new_pass",
config['debug_new_pass'], "--debug_new_minimize",
config['debug_new_minimize'], "--applied_pass_name",
config['applied_pass_name']
]
elif ps_mode == "gpu-ps":
os.environ['FLAGS_LAUNCH_BARRIER'] = '0'
os.environ['PADDLE_PSERVER_NUMS'] = '1'
os.environ['PADDLE_TRAINERS_NUM'] = '1'
os.environ['POD_IP'] = '127.0.0.1'
os.environ['PADDLE_PSERVERS_IP_PORT_LIST'] = '127.0.0.1:29011'
os.environ['PADDLE_PORT'] = '29011'
os.environ['FLAGS_selected_gpus'] = '0,1,2,3,4,5,6,7'
# pserver
# os.environ['TRAINING_ROLE'] = 'PSERVER'

# trainer
os.environ['TRAINING_ROLE'] = 'TRAINER'
os.environ['PADDLE_TRAINER_ID'] = '0'

cmd = [
sys.executable, "-u", "../ps/ps_dnn_trainer.py", "-m",
config['ps_mode_config'], "--run_minimize",
config['run_minimize'], "--run_single_pass",
config['run_single_pass'], "--debug_new_pass",
config['debug_new_pass'], "--debug_new_minimize",
config['debug_new_minimize'], "--applied_pass_name",
config['applied_pass_name']
]

cmd = [shlex.quote(c) for c in cmd]
prepare_python_path_and_return_module(__file__)
exitcode = os.system(' '.join(cmd))
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import paddle
from ps_pass_test_base import *
from paddle.distributed.ps.utils.public import logger
from paddle.fluid.tests.unittests.ps.ps_dnn_trainer import DnnTrainer


Expand All @@ -38,30 +39,43 @@ def init(self):
self.config['applied_pass_name'] = ""

def setUp(self):
print('TestPsTrainerPass setUp...')
pass

def tearDown(self):
print('TestPsTrainerPass tearDown...')
pass

def check(self):
pass

def test_ps_optimizer_minimize(self):
def test_ps_optimizer_minimize_cpu(self):
self.init()
self.config['run_minimize'] = '1'

self.config['debug_new_minimize'] = '0'
self.config['log_dir'] = "/log_old_minimize"
self.config['log_dir'] = "/cpu_log_old_minimize"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config)

self.config['debug_new_minimize'] = '1'
self.config['log_dir'] = "/log_new_minimize"
self.config['log_dir'] = "/cpu_log_new_minimize"
remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config)

self.check()

def test_ps_optimizer_minimize_gpu(self):
self.init()
self.config['run_minimize'] = '1'
self.config['ps_mode_config'] = "../ps/gpu_ps_config.yaml"

self.config['debug_new_minimize'] = '0'
self.ps_launch(self.config, "gpu-ps")

self.config['debug_new_minimize'] = '1'
self.ps_launch(self.config, "gpu-ps")

self.check()

def test_append_send_ops_pass(self):
self.init()
self.config['run_single_pass'] = '1'
Expand All @@ -70,12 +84,12 @@ def test_append_send_ops_pass(self):
self.config['debug_new_pass'] = '0'
self.config['log_dir'] = "/log_old_" + self.config['applied_pass_name']
remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config)
self.ps_launch(self.config, "cpu-ps")

self.config['debug_new_pass'] = '1'
self.config['log_dir'] = "/log_new_" + self.config['applied_pass_name']
remove_path_if_exists(self.config['log_dir'])
self.ps_launch(self.config)
self.ps_launch(self.config, "cpu-ps")

self.check()

Expand All @@ -84,4 +98,5 @@ def test_distributed_ops_pass(self):


if __name__ == '__main__':
remove_path_if_exists('/ps_log')
unittest.main()
2 changes: 2 additions & 0 deletions python/paddle/fluid/tests/unittests/ps/cpu_async_ps_config.yaml
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# refer to PaddleRec/models/rank/dnn/benchmark.yaml

hyper_parameters:
optimizer:
class: Adam
Expand Down
Loading