Skip to content

Commit

Permalink
Merge pull request #1051 from jacquesqiao/add-pserver-util
Browse files Browse the repository at this point in the history
Add ParameterServerController for parameter server python api
  • Loading branch information
jacquesqiao authored Jan 11, 2017
2 parents 77eb729 + aa9f516 commit f8a529c
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 108 deletions.
1 change: 1 addition & 0 deletions demo/quick_start/cluster/cluster_train.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ log_file="$bin_dir/train.log"
pushd "$home_dir"
cfg=trainer_config.lr.py
paddle train \
--start_pserver=false \
--config=$cfg \
--save_dir=${model_dir} \
--trainer_count=4 \
Expand Down
6 changes: 4 additions & 2 deletions paddle/pserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ set(PSERVER_SOURCES
BaseClient.cpp
ParameterClient2.cpp
ParameterServer2.cpp
SparseParameterDistribution.cpp)
SparseParameterDistribution.cpp
ParameterServerController.cpp)

set(PSERVER_HEADERS
BaseClient.h
ParameterClient2.h
ParameterServer2.h
SparseParameterDistribution.h)
SparseParameterDistribution.h
ParameterServerController.h)

add_library(paddle_pserver STATIC
${PSERVER_SOURCES})
Expand Down
59 changes: 5 additions & 54 deletions paddle/pserver/ParameterServer2Main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,66 +13,17 @@ See the License for the specific language governing permissions and
limitations under the License. */

#include <fstream>
#include "paddle/utils/StringUtil.h"
#include "paddle/utils/Util.h"

#include "ParameterServer2.h"
#include "RDMANetwork.h"
#include "paddle/utils/Flags.h"
#include "ParameterServerController.h"

using namespace paddle; // NOLINT

int main(int argc, char** argv) {
initMain(argc, argv);

std::vector<std::string> devices;
std::vector<std::shared_ptr<ParameterServer2>> pservers;

// round robin to loadbalance RDMA server ENGINE
int rdmaCpu = 0;
int onlineCpus = rdma::numCpus();
int numPorts = FLAGS_ports_num + FLAGS_ports_num_for_sparse;
if (FLAGS_nics.empty()) {
pservers.resize(numPorts);
for (int i = 0; i < numPorts; ++i) {
if (FLAGS_rdma_tcp == "rdma") {
pservers[i].reset(
new ParameterServer2(std::string(), FLAGS_port + i, rdmaCpu++));
rdmaCpu = rdmaCpu % onlineCpus;
} else {
pservers[i].reset(new ParameterServer2(std::string(), FLAGS_port + i));
}
CHECK(pservers[i]->init()) << "Fail to initialize parameter server"
<< FLAGS_port + i;
LOG(INFO) << "pserver started : " << FLAGS_port + i;
pservers[i]->start();
}
} else {
str::split(FLAGS_nics, ',', &devices);
pservers.resize(devices.size() * numPorts);
for (int i = 0; i < numPorts; ++i) {
for (size_t j = 0; j < devices.size(); ++j) {
if (FLAGS_rdma_tcp == "rdma") {
pservers[i * devices.size() + j].reset(new ParameterServer2(
getIpAddr(devices[j]), FLAGS_port + i, rdmaCpu++));
rdmaCpu = rdmaCpu % onlineCpus;
} else {
pservers[i * devices.size() + j].reset(
new ParameterServer2(getIpAddr(devices[j]), FLAGS_port + i));
}
CHECK(pservers[i * devices.size() + j]->init())
<< "Fail to initialize parameter server" << devices[j]
<< FLAGS_port + i;
LOG(INFO) << "pserver started : " << devices[j] << ":"
<< FLAGS_port + i;
pservers[i * devices.size() + j]->start();
}
}
}

for (auto& pserver : pservers) {
pserver->join();
}
std::unique_ptr<ParameterServerController> parameterServerPtr(
paddle::ParameterServerController::createFromGflags());
parameterServerPtr->start();
parameterServerPtr->wait();

return 0;
}
102 changes: 102 additions & 0 deletions paddle/pserver/ParameterServerController.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "ParameterServerController.h"

namespace paddle {

ParameterServerController::ParameterServerController(
const ParameterServerConfig& config) {
// round robin to load balance RDMA server ENGINE
std::vector<std::string> devices;
int rdmaCpu = 0;
int onlineCpus = rdma::numCpus();
int numPorts = config.ports_num() + config.ports_num_for_sparse();

if (config.nics().empty()) {
parameterServers_.resize(numPorts);
for (int i = 0; i < numPorts; ++i) {
if (config.rdma_tcp() == "rdma") {
parameterServers_[i].reset(
new ParameterServer2(std::string(), config.port() + i, rdmaCpu++));
rdmaCpu = rdmaCpu % onlineCpus;
} else {
parameterServers_[i].reset(
new ParameterServer2(std::string(), config.port() + i));
}
CHECK(parameterServers_[i]->init()) << "Fail to initialize parameter "
"server on port "
<< config.port() + i;
}
} else {
str::split(config.nics(), ',', &devices);
parameterServers_.resize(devices.size() * numPorts);
for (int i = 0; i < numPorts; ++i) {
for (size_t j = 0; j < devices.size(); ++j) {
if (config.rdma_tcp() == "rdma") {
parameterServers_[i * devices.size() + j].reset(new ParameterServer2(
getIpAddr(devices[j]), config.port() + i, rdmaCpu++));
rdmaCpu = rdmaCpu % onlineCpus;
} else {
parameterServers_[i * devices.size() + j].reset(
new ParameterServer2(getIpAddr(devices[j]), config.port() + i));
}
CHECK(parameterServers_[i * devices.size() + j]->init())
<< "Fail to initialize parameter server with device " << devices[j]
<< config.port() + i;
}
}
}
}

ParameterServerController::~ParameterServerController() { this->wait(); }

ParameterServerController* ParameterServerController::createFromGflags() {
ParameterServerConfig config;

config.set_nics(FLAGS_nics);
config.set_rdma_tcp(FLAGS_rdma_tcp);
config.set_port(FLAGS_port);
config.set_ports_num(FLAGS_ports_num);
config.set_ports_num_for_sparse(FLAGS_ports_num_for_sparse);

return create(config);
}

ParameterServerController* ParameterServerController::create(
const ParameterServerConfig& config) {
return new ParameterServerController(config);
}

void ParameterServerController::start() {
LOG(INFO) << "number of parameterServer instances: "
<< parameterServers_.size();
int i = 0;
for (const auto& parameterServer : parameterServers_) {
LOG(INFO) << "Starting parameterServer[" << i << "]";
parameterServer->start();
i++;
}
}

void ParameterServerController::wait() {
int i = 0;
for (const auto& parameterServer : parameterServers_) {
LOG(INFO) << "Waiting parameterServer[" << i << "]";
parameterServer->join();
i++;
}
}

} // namespace paddle
74 changes: 74 additions & 0 deletions paddle/pserver/ParameterServerController.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once

#include "ParameterServer2.h"
#include "ParameterServerConfig.pb.h"
#include "RDMANetwork.h"
#include "paddle/utils/StringUtil.h"

namespace paddle {

/**
* @brief ParameterServerController is used for create, init and manage multi
* parameter server instances. The num of the instances is decided by port
* num(the ports number for parameter send) and network devices configured
* by gflags or proto.
*/
class ParameterServerController final {
public:
DISABLE_COPY(ParameterServerController);

/**
* @brief Ctor, Create a ParameterServerController from ParameterServerConfig.
*/
explicit ParameterServerController(const ParameterServerConfig& config);

/**
* @brief Dtor.
*/
~ParameterServerController();

/**
* @brief create ParameterServerController from gflags, this is used for
* compatibility with the old usage of configuration by gflags.
*/
static ParameterServerController* createFromGflags();

/**
* @brief create ParameterServerController with ParameterServerConfig, remove
* gflags from ParameterServer. Init all ParameterServer2 instances according
* to
* the config.
*/
static ParameterServerController* create(const ParameterServerConfig& config);

/**
* @brief start all ParameterServer2 instances in this
* ParameterServerController.
*/
void start();

/**
* @brief join and wait for all ParameterServer2 instances thread in this
* ParameterServerController.
*/
void wait();

private:
std::vector<std::unique_ptr<ParameterServer2>> parameterServers_;
};

} // namespace paddle
57 changes: 6 additions & 51 deletions paddle/trainer/TrainerMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/pserver/ParameterServer2.h"
#include "paddle/utils/Common.h"
#include <fenv.h>
#include "paddle/pserver/ParameterServerController.h"
#include "paddle/utils/PythonUtil.h"
#include "paddle/utils/StringUtil.h"

#include "ParamUtil.h"
#include "Trainer.h"
#include "paddle/pserver/RDMANetwork.h"

DEFINE_bool(start_pserver, false, "Whether to start pserver");
DECLARE_int32(gpu_id);
Expand All @@ -38,54 +36,11 @@ int main(int argc, char** argv) {
initMain(argc, argv);
initPython(argc, argv);

std::vector<std::unique_ptr<ParameterServer2>> pservers;
std::vector<std::string> devices;

std::unique_ptr<ParameterServerController> parameterServerPtr(nullptr);
if (FLAGS_start_pserver) {
// round robin to loadbalance RDMA server ENGINE
int rdmaCpu = 0;
int onlineCpus = rdma::numCpus();
int numPorts = FLAGS_ports_num + FLAGS_ports_num_for_sparse;
if (FLAGS_nics.empty()) {
pservers.resize(numPorts);
for (int i = 0; i < numPorts; ++i) {
if (FLAGS_rdma_tcp == "rdma") {
pservers[i].reset(
new ParameterServer2(std::string(), FLAGS_port + i, rdmaCpu++));
rdmaCpu = rdmaCpu % onlineCpus;
} else {
pservers[i].reset(
new ParameterServer2(std::string(), FLAGS_port + i));
}

CHECK(pservers[i]->init()) << "Fail to initialize parameter server"
<< FLAGS_port + i;
LOG(INFO) << "pserver started : " << FLAGS_port + i;
pservers[i]->start();
}
} else {
str::split(FLAGS_nics, ',', &devices);
pservers.resize(devices.size() * numPorts);
for (int i = 0; i < numPorts; ++i) {
for (size_t j = 0; j < devices.size(); ++j) {
if (FLAGS_rdma_tcp == "rdma") {
pservers[i * devices.size() + j].reset(new ParameterServer2(
getIpAddr(devices[j]), FLAGS_port + i, rdmaCpu++));
rdmaCpu = rdmaCpu % onlineCpus;
} else {
pservers[i * devices.size() + j].reset(
new ParameterServer2(getIpAddr(devices[j]), FLAGS_port + i));
}

CHECK(pservers[i * devices.size() + j]->init())
<< "Fail to initialize parameter server" << devices[j]
<< FLAGS_port + i;
LOG(INFO) << "pserver started : " << devices[j] << ":"
<< FLAGS_port + i;
pservers[i * devices.size() + j]->start();
}
}
}
parameterServerPtr.reset(
paddle::ParameterServerController::createFromGflags());
parameterServerPtr->start();
}
Trainer trainer;
auto config = TrainerConfigHelper::createFromFlags();
Expand Down
3 changes: 2 additions & 1 deletion proto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ set(proto_filenames
ModelConfig.proto
ParameterConfig.proto
ParameterService.proto
TrainerConfig.proto)
TrainerConfig.proto
ParameterServerConfig.proto)

set(PROTO_GEN)
set(PROTO_GEN_PY)
Expand Down
Loading

0 comments on commit f8a529c

Please sign in to comment.