Fix crash caused by multi threads

This commit is contained in:
dijunkun
2024-06-12 16:24:28 +08:00
parent 6f931f02ff
commit 4dd6294622
3 changed files with 63 additions and 42 deletions

View File

@@ -44,6 +44,7 @@ SignalServer::~SignalServer() {}
bool SignalServer::on_open(websocketpp::connection_hdl hdl) {
ws_connections_[hdl] = ws_connection_id_++;
LOG_INFO("New websocket connection [{}] established", ws_connection_id_);
json message = {{"type", "ws_connection_id"},
@@ -55,27 +56,30 @@ bool SignalServer::on_open(websocketpp::connection_hdl hdl) {
bool SignalServer::on_close(websocketpp::connection_hdl hdl) {
std::string user_id = transmission_manager_.ReleaseUserFromeWsHandle(hdl);
LOG_INFO("Websocket onnection [{}|{}] closed", ws_connections_[hdl], user_id);
if (!user_id.empty()) {
LOG_INFO("Websocket onnection [{}|{}] closed", ws_connections_[hdl],
user_id);
std::string transmission_id = transmission_manager_.IsHost(user_id);
std::string transmission_id = transmission_manager_.IsHost(user_id);
if (!transmission_id.empty()) {
transmission_manager_.ReleaseTransmission(transmission_id);
LOG_INFO("Release transmission [{}] due to host leaves", transmission_id);
} else {
transmission_id = transmission_manager_.IsGuest(user_id);
}
if (!transmission_id.empty()) {
transmission_manager_.ReleaseTransmission(transmission_id);
LOG_INFO("Release transmission [{}] due to host leaves", transmission_id);
} else {
transmission_id = transmission_manager_.IsGuest(user_id);
}
if (!transmission_id.empty()) {
json message = {{"type", "user_leave_transmission"},
{"transmission_id", transmission_id},
{"user_id", user_id}};
if (!transmission_id.empty()) {
json message = {{"type", "user_leave_transmission"},
{"transmission_id", transmission_id},
{"user_id", user_id}};
std::vector<std::string> user_id_list =
transmission_manager_.GetAllUserIdOfTransmission(transmission_id);
std::vector<std::string> user_id_list =
transmission_manager_.GetAllUserIdOfTransmission(transmission_id);
for (const auto& user_id : user_id_list) {
send_msg(transmission_manager_.GetWsHandle(user_id), message);
for (const auto& user_id : user_id_list) {
send_msg(transmission_manager_.GetWsHandle(user_id), message);
}
}
}
@@ -125,8 +129,9 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl,
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string password = j["password"].get<std::string>();
std::string host_id = j["user_id"].get<std::string>();
LOG_INFO("Receive host id [{}] create transmission request with id [{}]",
host_id, transmission_id);
LOG_INFO(
"Receive host id [{}|{}] create transmission request with id [{}]",
host_id, hdl.lock().get(), transmission_id);
if (!transmission_manager_.IsTransmissionExist(transmission_id)) {
if (transmission_id.empty()) {
transmission_id = GenerateTransmissionId();

View File

@@ -25,10 +25,18 @@ bool TransmissionManager::IsTransmissionExist(
bool TransmissionManager::ReleaseTransmission(
const std::string& transmission_id) {
std::lock_guard<std::recursive_mutex> lock(ws_hdl_alive_checker_mutex_);
if (transmission_guest_id_list_.end() !=
transmission_guest_id_list_.find(transmission_id)) {
auto guest_id_list = transmission_guest_id_list_[transmission_id];
for (auto& guest_id : guest_id_list) {
auto hdl = GetWsHandle(guest_id);
if (ws_hdl_iter_list_.find(hdl) != ws_hdl_iter_list_.end()) {
auto iter = ws_hdl_iter_list_[hdl];
ws_hdl_last_active_time_list_.erase(iter);
ws_hdl_iter_list_.erase(hdl);
}
if (user_id_ws_hdl_list_.find(guest_id) != user_id_ws_hdl_list_.end()) {
LOG_INFO("Remove user id [{}] from transmission [{}]", guest_id,
transmission_id);
@@ -42,6 +50,13 @@ bool TransmissionManager::ReleaseTransmission(
if (transmission_host_id_list_.end() !=
transmission_host_id_list_.find(transmission_id)) {
auto host_id = transmission_host_id_list_[transmission_id];
auto hdl = GetWsHandle(host_id);
if (ws_hdl_iter_list_.find(hdl) != ws_hdl_iter_list_.end()) {
auto iter = ws_hdl_iter_list_[hdl];
ws_hdl_last_active_time_list_.erase(iter);
ws_hdl_iter_list_.erase(hdl);
}
if (user_id_ws_hdl_list_.find(host_id) != user_id_ws_hdl_list_.end()) {
LOG_INFO("Remove user id [{}] from transmission [{}]", host_id,
transmission_id);
@@ -121,7 +136,7 @@ bool TransmissionManager::BindGuestToTransmission(
if (transmission_guest_id_list_.find(transmission_id) ==
transmission_guest_id_list_.end()) {
transmission_guest_id_list_[transmission_id].push_back(guest_id);
LOG_INFO("Bind guest id [{}] to transmission [{}]", guest_id,
LOG_INFO("Bind guest id [{}] to transmission [{}]", guest_id,
transmission_id);
return true;
} else {
@@ -242,7 +257,6 @@ websocketpp::connection_hdl TransmissionManager::GetWsHandle(
std::string TransmissionManager::GetUserId(websocketpp::connection_hdl hdl) {
for (auto it = user_id_ws_hdl_list_.begin(); it != user_id_ws_hdl_list_.end();
++it) {
// LOG_INFO("[{}]", it->first);
if (it->second.lock().get() == hdl.lock().get()) return it->first;
}
return "";
@@ -274,7 +288,8 @@ std::string TransmissionManager::GetPassword(
int TransmissionManager::UpdateWsHandleLastActiveTime(
websocketpp::connection_hdl hdl) {
// if already record last active time
std::lock_guard<std::mutex> lock(ws_hdl_alive_checker_mutex_);
std::lock_guard<std::recursive_mutex> lock(ws_hdl_alive_checker_mutex_);
if (ws_hdl_iter_list_.find(hdl) != ws_hdl_iter_list_.end()) {
auto it = ws_hdl_iter_list_[hdl];
if (it != ws_hdl_last_active_time_list_.end()) {
@@ -283,9 +298,7 @@ int TransmissionManager::UpdateWsHandleLastActiveTime(
}
uint32_t now_time =
std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();
std::chrono::system_clock::now().time_since_epoch().count();
ws_hdl_last_active_time_list_.push_front(std::make_pair(hdl, now_time));
ws_hdl_iter_list_[hdl] = ws_hdl_last_active_time_list_.begin();
@@ -294,37 +307,40 @@ int TransmissionManager::UpdateWsHandleLastActiveTime(
void TransmissionManager::AliveChecker() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::seconds(10));
std::lock_guard<std::mutex> lock(ws_hdl_alive_checker_mutex_);
std::lock_guard<std::recursive_mutex> lock(ws_hdl_alive_checker_mutex_);
while (!ws_hdl_last_active_time_list_.empty()) {
auto hdl = ws_hdl_last_active_time_list_.back().first;
if (hdl.expired()) {
break;
}
uint32_t now_time =
std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count();
bool is_dead = now_time - ws_hdl_last_active_time_list_.back().second > 2;
std::chrono::system_clock::now().time_since_epoch().count();
uint32_t duration =
now_time - ws_hdl_last_active_time_list_.back().second;
bool is_dead = duration > 100000000 ? true : false;
if (is_dead) {
auto hdl = ws_hdl_last_active_time_list_.back().first;
LOG_INFO("Websocket handle [{}] is dead", hdl.lock().get());
if (ws_hdl_iter_list_.find(hdl) != ws_hdl_iter_list_.end()) {
auto it = ws_hdl_iter_list_[hdl];
ws_hdl_last_active_time_list_.pop_back();
auto user_id = GetUserId(hdl);
auto transmission_id = IsHost(user_id);
if (transmission_id.empty()) {
LOG_INFO("Host [{}] is dead, release transmission [{}]", user_id,
transmission_id);
LOG_INFO("Host [{}|{}] is dead, release transmission [{}]", user_id,
hdl.lock().get(), transmission_id);
ReleaseTransmission(transmission_id);
} else {
transmission_id = IsGuest(user_id);
if (transmission_id.empty()) {
LOG_INFO("Guest [{}] is dead, release it from transmission [{}]",
user_id, transmission_id);
ReleaseUserFromeWsHandle(hdl);
}
}
ws_hdl_last_active_time_list_.pop_back();
ws_hdl_iter_list_.erase(hdl);
}
} else {
break;
}
}
}

View File

@@ -66,7 +66,7 @@ class TransmissionManager {
std::owner_less<websocketpp::connection_hdl>>
ws_hdl_iter_list_;
std::thread ws_hdl_alive_checker_;
std::mutex ws_hdl_alive_checker_mutex_;
std::recursive_mutex ws_hdl_alive_checker_mutex_;
};
#endif