From b2fef4d6881a27537ceb2af2f75a5c290d662d25 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Sat, 28 Feb 2026 13:54:55 +0800 Subject: [PATCH] [feat] add device presence support --- src/device_db_manager/device_db_manager.cpp | 158 ++++++++++++++++++++ src/device_db_manager/device_db_manager.h | 8 + src/presence_manager.cpp | 70 +++++++++ src/presence_manager.h | 55 +++++++ src/signal_negotiation.cpp | 9 +- src/signal_negotiation.h | 6 +- src/signal_server.cpp | 64 +++++++- src/signal_server.h | 6 +- xmake.lua | 7 +- 9 files changed, 369 insertions(+), 14 deletions(-) create mode 100644 src/presence_manager.cpp create mode 100644 src/presence_manager.h diff --git a/src/device_db_manager/device_db_manager.cpp b/src/device_db_manager/device_db_manager.cpp index 13d4e75..c3b54bd 100644 --- a/src/device_db_manager/device_db_manager.cpp +++ b/src/device_db_manager/device_db_manager.cpp @@ -54,6 +54,19 @@ void DeviceDBManager::InitDB() { "INSERT INTO device_id_seq (next_id) " "SELECT 1 WHERE NOT EXISTS (SELECT 1 FROM device_id_seq);"; + const char* sql_presence = + "CREATE TABLE IF NOT EXISTS device_presence (" + "device_id TEXT PRIMARY KEY," + "online INTEGER NOT NULL," + "updated_at INTEGER NOT NULL" + ");"; + const char* sql_user_devices = + "CREATE TABLE IF NOT EXISTS user_devices (" + "user_id TEXT NOT NULL," + "device_id TEXT NOT NULL," + "PRIMARY KEY(user_id, device_id)" + ");"; + char* err_msg = nullptr; if (sqlite3_exec(db_, sql_devices, nullptr, nullptr, &err_msg) != SQLITE_OK) { @@ -74,6 +87,19 @@ void DeviceDBManager::InitDB() { sqlite3_free(err_msg); return; } + + if (sqlite3_exec(db_, sql_presence, nullptr, nullptr, &err_msg) != + SQLITE_OK) { + LOG_ERROR("Failed to create device_presence table: {}", err_msg); + sqlite3_free(err_msg); + return; + } + if (sqlite3_exec(db_, sql_user_devices, nullptr, nullptr, &err_msg) != + SQLITE_OK) { + LOG_ERROR("Failed to create user_devices table: {}", err_msg); + sqlite3_free(err_msg); + return; + } } std::string DeviceDBManager::Sha256(const std::string& str) { @@ -384,6 +410,70 @@ bool DeviceDBManager::UpdatePassword(const std::string& device_id, return success; } +bool DeviceDBManager::SetUserDevices( + const std::string& user_id, const std::vector& device_ids) { + if (db_ == nullptr) { + LOG_ERROR("Database is not initialized in SetUserDevices."); + return false; + } + sqlite3_exec(db_, "BEGIN TRANSACTION;", nullptr, nullptr, nullptr); + const char* del_sql = "DELETE FROM user_devices WHERE user_id = ?;"; + sqlite3_stmt* del_stmt = nullptr; + if (sqlite3_prepare_v2(db_, del_sql, -1, &del_stmt, nullptr) != SQLITE_OK) { + sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr); + return false; + } + sqlite3_bind_text(del_stmt, 1, user_id.c_str(), -1, SQLITE_TRANSIENT); + if (sqlite3_step(del_stmt) != SQLITE_DONE) { + sqlite3_finalize(del_stmt); + sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr); + return false; + } + sqlite3_finalize(del_stmt); + + const char* ins_sql = + "INSERT INTO user_devices (user_id, device_id) VALUES (?, ?);"; + sqlite3_stmt* ins_stmt = nullptr; + if (sqlite3_prepare_v2(db_, ins_sql, -1, &ins_stmt, nullptr) != SQLITE_OK) { + sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr); + return false; + } + for (const auto& id : device_ids) { + sqlite3_bind_text(ins_stmt, 1, user_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text(ins_stmt, 2, id.c_str(), -1, SQLITE_TRANSIENT); + if (sqlite3_step(ins_stmt) != SQLITE_DONE) { + sqlite3_finalize(ins_stmt); + sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr); + return false; + } + sqlite3_reset(ins_stmt); + sqlite3_clear_bindings(ins_stmt); + } + sqlite3_finalize(ins_stmt); + sqlite3_exec(db_, "COMMIT;", nullptr, nullptr, nullptr); + return true; +} + +std::vector DeviceDBManager::GetUserDevices( + const std::string& user_id) { + std::vector devices; + if (db_ == nullptr) { + LOG_ERROR("Database is not initialized in GetUserDevices."); + return devices; + } + const char* sql = "SELECT device_id FROM user_devices WHERE user_id = ?;"; + sqlite3_stmt* stmt = nullptr; + if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) { + return devices; + } + sqlite3_bind_text(stmt, 1, user_id.c_str(), -1, SQLITE_TRANSIENT); + while (sqlite3_step(stmt) == SQLITE_ROW) { + devices.emplace_back( + reinterpret_cast(sqlite3_column_text(stmt, 0))); + } + sqlite3_finalize(stmt); + return devices; +} bool DeviceDBManager::RemoveDevice(const std::string& device_id) { if (db_ == nullptr) { LOG_ERROR("Database is not initialized in RemoveDevice."); @@ -402,3 +492,71 @@ bool DeviceDBManager::RemoveDevice(const std::string& device_id) { sqlite3_finalize(stmt); return success; } + +bool DeviceDBManager::SetDeviceOnline(const std::string& device_id, + bool online) { + if (db_ == nullptr) { + LOG_ERROR("Database is not initialized in SetDeviceOnline."); + return false; + } + + const char* sql = + "INSERT INTO device_presence (device_id, online, updated_at) " + "VALUES (?, ?, strftime('%s','now')) " + "ON CONFLICT(device_id) DO UPDATE SET online=excluded.online, " + "updated_at=excluded.updated_at;"; + + sqlite3_stmt* stmt = nullptr; + if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) { + return false; + } + sqlite3_bind_text(stmt, 1, device_id.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_int(stmt, 2, online ? 1 : 0); + bool ok = (sqlite3_step(stmt) == SQLITE_DONE); + sqlite3_finalize(stmt); + return ok; +} + +std::vector> DeviceDBManager::BatchQueryOnline( + const std::vector& device_ids) { + std::vector> result; + if (db_ == nullptr) { + LOG_ERROR("Database is not initialized in BatchQueryOnline."); + return result; + } + if (device_ids.empty()) { + return result; + } + + std::stringstream ss; + ss << "SELECT device_id, online FROM device_presence WHERE device_id IN ("; + for (size_t i = 0; i < device_ids.size(); ++i) { + ss << (i == 0 ? "?" : ",?"); + } + ss << ");"; + std::string sql = ss.str(); + + sqlite3_stmt* stmt = nullptr; + if (sqlite3_prepare_v2(db_, sql.c_str(), -1, &stmt, nullptr) != SQLITE_OK) { + return result; + } + for (size_t i = 0; i < device_ids.size(); ++i) { + sqlite3_bind_text(stmt, static_cast(i + 1), device_ids[i].c_str(), -1, + SQLITE_TRANSIENT); + } + + std::unordered_map map; + while (sqlite3_step(stmt) == SQLITE_ROW) { + std::string id(reinterpret_cast(sqlite3_column_text(stmt, 0))); + int online = sqlite3_column_int(stmt, 1); + map[id] = (online != 0); + } + sqlite3_finalize(stmt); + + for (const auto& id : device_ids) { + auto it = map.find(id); + bool online = (it != map.end()) ? it->second : false; + result.emplace_back(id, online); + } + return result; +} diff --git a/src/device_db_manager/device_db_manager.h b/src/device_db_manager/device_db_manager.h index 6a4760d..bdcde13 100644 --- a/src/device_db_manager/device_db_manager.h +++ b/src/device_db_manager/device_db_manager.h @@ -10,6 +10,7 @@ #include #include +#include struct DeviceCredential { std::string device_id; @@ -34,6 +35,13 @@ class DeviceDBManager { int VerifyDevice(const std::string& device_id, const std::string& password); bool RemoveDevice(const std::string& device_id); + bool SetDeviceOnline(const std::string& device_id, bool online); + std::vector> BatchQueryOnline( + const std::vector& device_ids); + bool SetUserDevices(const std::string& user_id, + const std::vector& device_ids); + std::vector GetUserDevices(const std::string& user_id); + private: void InitDB(); std::string Sha256(const std::string& str); diff --git a/src/presence_manager.cpp b/src/presence_manager.cpp new file mode 100644 index 0000000..d59aeb1 --- /dev/null +++ b/src/presence_manager.cpp @@ -0,0 +1,70 @@ +#include "presence_manager.h" + +#include + +void PresenceManager::OnLogin(const std::string& user_id, + const std::string& device_id, + websocketpp::connection_hdl hdl) { + if (db_) { + db_->SetDeviceOnline(device_id, true); + } + NotifyUserDevices(user_id, device_id, true); +} + +void PresenceManager::OnLogout(const std::string& device_id) { + std::string user_id; + user_id = device_id; + if (db_) { + db_->SetDeviceOnline(device_id, false); + } + if (!user_id.empty()) { + NotifyUserDevices(user_id, device_id, false); + } +} + +bool PresenceManager::IsOnline(const std::string& device_id) const { + if (!db_) return false; + auto res = db_->BatchQueryOnline({device_id}); + return !res.empty() && res[0].second; +} + +std::vector> PresenceManager::BatchQuery( + const std::vector& device_ids) const { + std::vector> result; + if (db_) { + return db_->BatchQueryOnline(device_ids); + } + return result; +} + +void PresenceManager::NotifyUserDevices(const std::string& user_id, + const std::string& changed_device_id, + bool online) { + if (!db_ || !send_msg_) return; + auto devices = db_->GetUserDevices(user_id); + if (devices.empty()) return; + auto statuses = db_->BatchQueryOnline(devices); + std::vector targets; + for (const auto& p : statuses) { + if (p.first == changed_device_id) continue; + if (p.second) { + targets.push_back(p.first); + } + } + nlohmann::json j = { + {"type", "presence_update"}, + {"id", changed_device_id}, + {"online", online}, + }; + for (auto& id : targets) { + if (send_to_device_) { + send_to_device_(id, j); + } + } +} + +void PresenceManager::UpdateUserDevices( + const std::string& user_id, const std::vector& device_ids) { + if (!db_) return; + db_->SetUserDevices(user_id, device_ids); +} diff --git a/src/presence_manager.h b/src/presence_manager.h new file mode 100644 index 0000000..61b88d3 --- /dev/null +++ b/src/presence_manager.h @@ -0,0 +1,55 @@ +/* + * @Author: DI JUNKUN + * @Date: 2026-02-28 + * Copyright (c) 2026 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _PRESENCE_MANAGER_H_ +#define _PRESENCE_MANAGER_H_ + +#include +#include +#include +#include +#include + +#include "device_db_manager.h" + +using nlohmann::json; + +struct OnlineSession { + std::string user_id; + std::string device_id; + websocketpp::connection_hdl hdl; +}; + +class PresenceManager { + public: + void SetDeviceDB(DeviceDBManager* db) { db_ = db; } + void SetSendMsgCallback( + std::function send_msg) { + send_msg_ = send_msg; + } + void SetSendToDeviceCallback( + std::function send_to_device) { + send_to_device_ = send_to_device; + } + + void OnLogin(const std::string& user_id, const std::string& device_id, + websocketpp::connection_hdl hdl); + void OnLogout(const std::string& device_id); + bool IsOnline(const std::string& device_id) const; + std::vector> BatchQuery( + const std::vector& device_ids) const; + void NotifyUserDevices(const std::string& user_id, + const std::string& changed_device_id, bool online); + void UpdateUserDevices(const std::string& user_id, + const std::vector& device_ids); + + private: + std::function send_msg_; + DeviceDBManager* db_ = nullptr; + std::function send_to_device_; +}; + +#endif diff --git a/src/signal_negotiation.cpp b/src/signal_negotiation.cpp index a8c1591..54efa18 100644 --- a/src/signal_negotiation.cpp +++ b/src/signal_negotiation.cpp @@ -16,10 +16,9 @@ bool GetStringField(const json& j, const char* key, std::string& value) { SignalNegotiation::SignalNegotiation( std::shared_ptr transmission_manager, - std::string db_path) - : transmission_manager_(transmission_manager) { - device_db_manager_ = std::make_unique(db_path); -} + DeviceDBManager* device_db) + : transmission_manager_(transmission_manager), + device_db_manager_(device_db) {} SignalNegotiation::~SignalNegotiation() {} @@ -409,4 +408,4 @@ void SignalNegotiation::OnWebClientDisconnect(const std::string& user_id) { pure_user_id); } } -} \ No newline at end of file +} diff --git a/src/signal_negotiation.h b/src/signal_negotiation.h index cff8172..a2b9525 100644 --- a/src/signal_negotiation.h +++ b/src/signal_negotiation.h @@ -17,7 +17,7 @@ using nlohmann::json; class SignalNegotiation { public: SignalNegotiation(std::shared_ptr transmission_manager, - std::string db_path); + DeviceDBManager* device_db); ~SignalNegotiation(); void SetSendMsgCallback( @@ -37,8 +37,8 @@ class SignalNegotiation { private: std::shared_ptr transmission_manager_; - std::unique_ptr device_db_manager_; + DeviceDBManager* device_db_manager_; std::function send_msg_; }; -#endif \ No newline at end of file +#endif diff --git a/src/signal_server.cpp b/src/signal_server.cpp index cb56049..757d652 100644 --- a/src/signal_server.cpp +++ b/src/signal_server.cpp @@ -31,11 +31,21 @@ SignalServer::SignalServer() { std::placeholders::_2)); transmission_manager_ = std::make_shared(); - signal_negotiation_ = - std::make_unique(transmission_manager_, db_path_); + device_db_manager_ = std::make_unique(db_path_); + signal_negotiation_ = std::make_unique( + transmission_manager_, device_db_manager_.get()); signal_negotiation_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg, this, std::placeholders::_1, std::placeholders::_2)); + presence_manager_ = std::make_unique(); + presence_manager_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg, this, + std::placeholders::_1, + std::placeholders::_2)); + presence_manager_->SetDeviceDB(device_db_manager_.get()); + presence_manager_->SetSendToDeviceCallback( + [this](const std::string& id, json msg) { + SendMsg(transmission_manager_->GetWsHandle(id), msg); + }); } SignalServer::SignalServer(uint16_t port, std::string certs_dir, @@ -69,11 +79,21 @@ SignalServer::SignalServer(uint16_t port, std::string certs_dir, std::placeholders::_2)); transmission_manager_ = std::make_shared(); - signal_negotiation_ = - std::make_unique(transmission_manager_, db_path_); + device_db_manager_ = std::make_unique(db_path_); + signal_negotiation_ = std::make_unique( + transmission_manager_, device_db_manager_.get()); signal_negotiation_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg, this, std::placeholders::_1, std::placeholders::_2)); + presence_manager_ = std::make_unique(); + presence_manager_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg, this, + std::placeholders::_1, + std::placeholders::_2)); + presence_manager_->SetDeviceDB(device_db_manager_.get()); + presence_manager_->SetSendToDeviceCallback( + [this](const std::string& id, json msg) { + SendMsg(transmission_manager_->GetWsHandle(id), msg); + }); } SignalServer::~SignalServer() {} @@ -95,6 +115,9 @@ bool SignalServer::OnClose(websocketpp::connection_hdl hdl) { signal_negotiation_->OnWebClientDisconnect(user_id); } } + if (presence_manager_ && !user_id.empty()) { + presence_manager_->OnLogout(user_id); + } ws_connections_.erase(hdl); return true; } @@ -111,6 +134,9 @@ bool SignalServer::OnFail(websocketpp::connection_hdl hdl) { signal_negotiation_->OnWebClientDisconnect(user_id); } } + if (presence_manager_ && !user_id.empty()) { + presence_manager_->OnLogout(user_id); + } ws_connections_.erase(hdl); return true; } @@ -271,6 +297,12 @@ void SignalServer::OnMessage(websocketpp::connection_hdl hdl, } case "login"_H: signal_negotiation_->login_user(hdl, j); + if (presence_manager_) { + std::string id = transmission_manager_->GetUserId(hdl); + if (!id.empty()) { + presence_manager_->OnLogin(id, id, hdl); + } + } break; case "user_leave_transmission"_H: signal_negotiation_->leave_transmission(hdl, j); @@ -293,6 +325,30 @@ void SignalServer::OnMessage(websocketpp::connection_hdl hdl, case "new_candidate_mid"_H: signal_negotiation_->new_candidate_mid(hdl, j); break; + case "recent_connections_presence"_H: { + std::string user_id; + if (!j.contains("user_id") || !j["user_id"].is_string()) { + LOG_ERROR("recent_connections missing field: user_id"); + break; + } + user_id = j["user_id"].get(); + std::vector device_ids; + if (j.contains("devices") && j["devices"].is_array()) { + for (auto& v : j["devices"]) { + if (v.is_string()) device_ids.push_back(v.get()); + } + } + if (presence_manager_) { + presence_manager_->UpdateUserDevices(user_id, device_ids); + auto statuses = presence_manager_->BatchQuery(device_ids); + json resp = {{"type", "presence"}, {"devices", json::array()}}; + for (const auto& p : statuses) { + resp["devices"].push_back({{"id", p.first}, {"online", p.second}}); + } + server_.send(hdl, resp.dump(), websocketpp::frame::opcode::text); + } + break; + } default: LOG_WARN("Unknown message type: {}", type); break; diff --git a/src/signal_server.h b/src/signal_server.h index 0b5efb0..947eeac 100644 --- a/src/signal_server.h +++ b/src/signal_server.h @@ -14,6 +14,8 @@ #include #include +#include "device_db_manager.h" +#include "presence_manager.h" #include "signal_negotiation.h" using nlohmann::json; @@ -51,7 +53,9 @@ class SignalServer { unsigned int ws_connection_id_ = 0; std::shared_ptr transmission_manager_; + std::unique_ptr device_db_manager_; std::unique_ptr signal_negotiation_; + std::unique_ptr presence_manager_; }; -#endif \ No newline at end of file +#endif diff --git a/xmake.lua b/xmake.lua index 8cb0ca9..b51ebfb 100644 --- a/xmake.lua +++ b/xmake.lua @@ -52,9 +52,14 @@ target("negotiation") add_files("src/signal_negotiation.cpp") add_includedirs("src", {public = true}) -target("server") +target("presence") set_kind("object") add_deps("log", "common", "negotiation") + add_files("src/presence_manager.cpp") + +target("server") + set_kind("object") + add_deps("log", "common", "negotiation", "presence") add_files("src/signal_server.cpp") target("crossdesk_server")