Skip to content
Snippets Groups Projects
Commit e5a5cd08 authored by Éloïs's avatar Éloïs
Browse files

[test] server: test save_peer & receive_new_heads

parent fb64bae4
Branches
Tags
1 merge request!1338Peers heads
...@@ -1253,6 +1253,7 @@ name = "duniter-server" ...@@ -1253,6 +1253,7 @@ name = "duniter-server"
version = "1.8.1" version = "1.8.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"cfg-if 1.0.0",
"dubp", "dubp",
"duniter-conf", "duniter-conf",
"duniter-dbs", "duniter-dbs",
......
...@@ -29,6 +29,8 @@ use duniter_mempools::Mempools; ...@@ -29,6 +29,8 @@ use duniter_mempools::Mempools;
use fast_threadpool::{JoinHandle, ThreadPoolDisconnected}; use fast_threadpool::{JoinHandle, ThreadPoolDisconnected};
use std::path::Path; use std::path::Path;
pub const SOFTWARE_NAME: &str = "duniter";
pub type Endpoint = String; pub type Endpoint = String;
#[async_trait::async_trait] #[async_trait::async_trait]
......
...@@ -7,6 +7,7 @@ edition = "2018" ...@@ -7,6 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
anyhow = "1.0.34" anyhow = "1.0.34"
cfg-if = "1.0.0"
dubp = { version = "0.32.3" } dubp = { version = "0.32.3" }
duniter-conf = { path = "../duniter-conf" } duniter-conf = { path = "../duniter-conf" }
duniter-dbs = { path = "../duniter-dbs" } duniter-dbs = { path = "../duniter-dbs" }
...@@ -22,3 +23,6 @@ log = "0.4.11" ...@@ -22,3 +23,6 @@ log = "0.4.11"
paste = "1.0.2" paste = "1.0.2"
resiter = "0.4.0" resiter = "0.4.0"
tokio = { version = "0.2.22", features = ["io-util", "rt-threaded"] } tokio = { version = "0.2.22", features = ["io-util", "rt-threaded"] }
[dev-dependencies]
duniter-dbs = { path = "../duniter-dbs", features = ["mem"] }
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Legacy functions intended to be used by DuniterJs uniquely. This module will be removed when the migration is complete.
mod block_indexer;
mod dunp;
mod tx_history;
mod txs_mempool;
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
impl DuniterServer {
pub fn apply_block(&mut self, block: DubpBlockV10Stringified) -> KvResult<()> {
let block = Arc::new(
DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
self.current = Some(duniter_dbs_write_ops::apply_block::apply_block(
&self.bc_db,
block.clone(),
self.current,
&self.dbs_pool,
false,
)?);
apply_block_modules(block, &self.conf, &self.dbs_pool, None)
}
pub fn apply_chunk_of_blocks(&mut self, blocks: Vec<DubpBlockV10Stringified>) -> KvResult<()> {
log::debug!("apply_chunk(#{})", blocks[0].number);
let blocks = Arc::from(
blocks
.into_iter()
.map(|block| DubpBlockV10::from_string_object(&block))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
self.current = Some(duniter_dbs_write_ops::apply_block::apply_chunk(
&self.bc_db,
self.current,
&self.dbs_pool,
blocks.clone(),
)?);
apply_chunk_of_blocks_modules(blocks, &self.conf, &self.dbs_pool, None)
}
pub fn revert_block(&mut self, block: DubpBlockV10Stringified) -> KvResult<()> {
let block = Arc::new(
DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
let block_arc_clone = Arc::clone(&block);
let txs_mp_job_handle = self
.dbs_pool
.launch(move |dbs| {
duniter_dbs_write_ops::txs_mp::revert_block(
block_arc_clone.transactions(),
&dbs.txs_mp_db,
)
})
.expect("dbs pool disconnected");
self.current = duniter_dbs_write_ops::bc::revert_block(&self.bc_db, &block)?;
txs_mp_job_handle.join().expect("dbs pool disconnected")?;
revert_block_modules(block, &self.conf, &self.dbs_pool, None)
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
impl DuniterServer {
pub fn receive_new_heads(
&self,
heads: Vec<(duniter_dbs::DunpNodeIdV1Db, duniter_dbs::DunpHeadDbV1)>,
) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
for (dunp_node_id, dunp_head) in heads {
dbs.cm_db
.dunp_heads_old_write()
.upsert(dunp_node_id, dunp_head)?
}
Ok::<(), KvError>(())
})
.expect("dbs pool disconnected")
}
pub fn remove_all_peers(&self) -> KvResult<()> {
use duniter_dbs::databases::dunp_v1::DunpV1DbWritable as _;
self.dbs_pool
.execute(move |dbs| dbs.dunp_db.peers_old_write().clear())
.expect("dbs pool disconnected")
}
pub fn remove_peer_by_pubkey(&self, pubkey: PublicKey) -> KvResult<()> {
use duniter_dbs::databases::dunp_v1::DunpV1DbWritable as _;
self.dbs_pool
.execute(move |dbs| dbs.dunp_db.peers_old_write().remove(PubKeyKeyV2(pubkey)))
.expect("dbs pool disconnected")
}
pub fn save_peer(&self, new_peer_card: PeerCardStringified) -> anyhow::Result<()> {
use dubp::crypto::keys::PublicKey as _;
let pubkey = PublicKey::from_base58(&new_peer_card.pubkey)?;
use duniter_dbs::databases::dunp_v1::DunpV1DbWritable as _;
self.dbs_pool
.execute(move |dbs| {
dbs.dunp_db.peers_old_write().upsert(
PubKeyKeyV2(pubkey),
duniter_dbs::PeerCardDbV1 {
version: new_peer_card.version,
currency: new_peer_card.currency,
pubkey: new_peer_card.pubkey,
blockstamp: new_peer_card.blockstamp,
endpoints: new_peer_card.endpoints,
status: new_peer_card.status,
signature: new_peer_card.signature,
},
)
})
.expect("dbs pool disconnected")
.map_err(|e| e.into())
}
pub fn update_self_peer(&self, new_peer_card: PeerCardStringified) {
self.dbs_pool
.execute(move |dbs| {
dbs.cm_db
.self_peer_old_write()
.upsert(
(),
duniter_dbs::PeerCardDbV1 {
version: new_peer_card.version,
currency: new_peer_card.currency,
pubkey: new_peer_card.pubkey,
blockstamp: new_peer_card.blockstamp,
endpoints: new_peer_card.endpoints,
status: new_peer_card.status,
signature: new_peer_card.signature,
},
)
.expect("fail to write on memory db")
})
.expect("dbs pool disconnected")
}
}
#[cfg(test)]
mod tests {
use dubp::crypto::keys::{
ed25519::{PublicKey, Signature},
PublicKey as _,
};
use duniter_dbs::PeerCardDbV1;
use super::*;
#[test]
fn test_receive_new_heads() -> anyhow::Result<()> {
use duniter_dbs::databases::cm_v1::CmV1DbReadable as _;
let (server, dbs) = DuniterServer::test(DuniterConf::default())?;
let head = (
duniter_dbs::DunpNodeIdV1Db::new(53, PublicKey::default()),
duniter_dbs::DunpHeadDbV1 {
api: "WS2P".to_owned(),
pubkey: PublicKey::default(),
blockstamp: Blockstamp::default(),
software: duniter_module::SOFTWARE_NAME.to_owned(),
software_version: "test".to_owned(),
pow_prefix: 1,
free_member_room: 0,
free_mirror_room: 0,
signature: Signature::default(),
},
);
assert_eq!(dbs.cm_db.dunp_heads_old().count()?, 0);
server.receive_new_heads(vec![head.clone()])?;
assert_eq!(dbs.cm_db.dunp_heads_old().count()?, 1);
assert_eq!(dbs.cm_db.dunp_heads_old().get(&head.0)?, Some(head.1));
Ok(())
}
#[test]
fn test_save_peer() -> anyhow::Result<()> {
use duniter_dbs::databases::dunp_v1::DunpV1DbReadable as _;
let (server, dbs) = DuniterServer::test(DuniterConf::default())?;
let peer = PeerCardStringified {
version: 0,
currency: "test".to_owned(),
pubkey: "82NdD9eEbXSjRJXeJdqf56xkpu6taTfTeEqtAtmtbyXY".to_owned(),
blockstamp: "379922-0000001D97770A8203062F9E618F29FFAA2EF4218649FCE6DD13E01C3932E943".to_owned(),
endpoints: vec![],
status: "UP".to_owned(),
signature: "KBaoJuKIfkWJO015BTegUN8l81VYPfleVUfQUwPRPAAF1oB398hDb1bX/QUFe+3CKFz57aGT8bB745mz90x5Ag==".to_owned(),
};
let pubkey = PublicKey::from_base58(&peer.pubkey)?;
assert_eq!(dbs.dunp_db.peers_old().count()?, 0);
server.save_peer(peer.clone())?;
assert_eq!(dbs.dunp_db.peers_old().count()?, 1);
let peer_db = dbs.dunp_db.peers_old().get(&PubKeyKeyV2(pubkey))?;
assert_eq!(
peer_db,
Some(PeerCardDbV1 {
version: peer.version,
currency: peer.currency,
pubkey: peer.pubkey,
blockstamp: peer.blockstamp,
endpoints: peer.endpoints,
status: peer.status,
signature: peer.signature,
})
);
Ok(())
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
impl DuniterServer {
pub fn get_transactions_history(&self, pubkey: PublicKey) -> KvResult<TxsHistory> {
self.dbs_pool
.execute(move |dbs| {
duniter_gva_dbs_reader::txs_history::get_transactions_history_for_bma(
&dbs.gva_db,
&dbs.txs_mp_db,
pubkey,
)
})
.expect("dbs pool disconnected")
}
pub fn get_tx_by_hash(
&self,
hash: Hash,
) -> KvResult<Option<(TransactionDocumentV10, Option<BlockNumber>)>> {
self.dbs_pool
.execute(move |dbs| {
if let Some(tx) = dbs.txs_mp_db.txs().get(&HashKeyV2(hash))? {
Ok(Some((tx.0, None)))
} else if let Some(tx_db) = dbs.gva_db.txs().get(&HashKeyV2(hash))? {
Ok(Some((tx_db.tx, Some(tx_db.written_block.number))))
} else {
Ok(None)
}
})
.expect("dbs pool disconnected")
}
}
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
impl DuniterServer {
pub fn accept_new_tx(
&self,
tx: TransactionDocumentV10,
server_pubkey: PublicKey,
) -> KvResult<bool> {
let txs_mempool = self.txs_mempool;
match self
.dbs_pool
.execute(move |dbs| {
txs_mempool.accept_new_tx(&dbs.bc_db_ro, server_pubkey, tx, &dbs.txs_mp_db)
})
.expect("dbs pool discorrected")
{
Ok(()) => Ok(true),
Err(TxMpError::Db(e)) => Err(e),
Err(_) => Ok(false),
}
}
pub fn add_pending_tx_force(&self, tx: TransactionDocumentV10) -> KvResult<()> {
let txs_mempool = self.txs_mempool;
self.dbs_pool
.execute(move |dbs| txs_mempool.add_pending_tx_force(&dbs.txs_mp_db, &tx))
.expect("dbs pool disconnected")
}
pub fn get_self_endpoints(&self) -> anyhow::Result<Vec<Endpoint>> {
if let Some(self_peer) = self
.dbs_pool
.execute(|dbs| dbs.cm_db.self_peer_old().get(&()))?
.context("fail to get self endpoints")?
{
Ok(self_peer.endpoints)
} else {
Ok(vec![])
}
}
pub fn get_mempool_txs_free_rooms(&self) -> KvResult<usize> {
let txs_mempool = self.txs_mempool;
self.dbs_pool
.execute(move |dbs| txs_mempool.get_free_rooms(&dbs.txs_mp_db))
.expect("dbs pool discorrected")
}
pub fn get_new_pending_txs(&self) -> KvResult<Vec<TransactionDocumentV10>> {
let mut new_pending_txs = BTreeMap::new();
for events in self.pending_txs_subscriber.drain() {
use std::ops::Deref as _;
for event in events.deref() {
match event {
duniter_dbs::databases::txs_mp_v2::TxsEvent::Upsert { key, value } => {
new_pending_txs.insert(key.0, value.0.clone());
}
duniter_dbs::databases::txs_mp_v2::TxsEvent::Remove { key } => {
new_pending_txs.remove(&key.0);
}
_ => (),
}
}
}
Ok(new_pending_txs.into_iter().map(|(_k, v)| v).collect())
}
pub fn get_pending_txs(
&self,
_blockchain_time: i64,
min_version: usize,
) -> KvResult<Vec<PendingTxDbV2>> {
self.dbs_pool
.execute(move |dbs| {
dbs.txs_mp_db.txs().iter(.., |it| {
it.values()
.filter_ok(|tx| tx.0.version() >= min_version)
.collect()
})
})
.expect("dbs pool disconnected")
}
pub fn remove_all_pending_txs(&self) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
duniter_dbs_write_ops::txs_mp::remove_all_pending_txs(&dbs.txs_mp_db)
})
.expect("dbs pool disconnected")
}
pub fn remove_pending_tx_by_hash(&self, hash: Hash) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
duniter_dbs_write_ops::txs_mp::remove_pending_tx_by_hash(&dbs.txs_mp_db, hash)
})
.expect("dbs pool disconnected")
}
pub fn trim_expired_non_written_txs(&self, limit_time: i64) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
duniter_dbs_write_ops::txs_mp::trim_expired_non_written_txs(
&dbs.txs_mp_db,
limit_time,
)
})
.expect("dbs pool disconnected")
}
}
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
unused_import_braces unused_import_braces
)] )]
mod legacy;
pub use duniter_conf::DuniterConf; pub use duniter_conf::DuniterConf;
pub use duniter_dbs::{kv_typed::prelude::KvResult, smallvec, DunpHeadDbV1, DunpNodeIdV1Db}; pub use duniter_dbs::{kv_typed::prelude::KvResult, smallvec, DunpHeadDbV1, DunpNodeIdV1Db};
pub use duniter_gva::{GvaConf, GvaModule, PeerCardStringified}; pub use duniter_gva::{GvaConf, GvaModule, PeerCardStringified};
...@@ -69,14 +71,29 @@ pub struct DuniterServer { ...@@ -69,14 +71,29 @@ pub struct DuniterServer {
plug_duniter_modules!([GvaModule]); plug_duniter_modules!([GvaModule]);
#[cfg(not(test))]
type DuniterServerInstance = DuniterServer;
#[cfg(test)]
type DuniterServerInstance = (DuniterServer, DuniterDbs<FileBackend>);
impl DuniterServer { impl DuniterServer {
#[cfg(test)]
pub(crate) fn test(conf: DuniterConf) -> anyhow::Result<DuniterServerInstance> {
DuniterServer::start(
None,
conf,
"test".to_owned(),
None,
duniter_module::SOFTWARE_NAME,
)
}
pub fn start( pub fn start(
command_name: Option<String>, command_name: Option<String>,
conf: DuniterConf, conf: DuniterConf,
currency: String, currency: String,
home_path_opt: Option<&Path>, home_path_opt: Option<&Path>,
software_version: &'static str, software_version: &'static str,
) -> anyhow::Result<Self> { ) -> anyhow::Result<DuniterServerInstance> {
let command = match command_name.unwrap_or_default().as_str() { let command = match command_name.unwrap_or_default().as_str() {
"sync" => DuniterCommand::Sync, "sync" => DuniterCommand::Sync,
_ => DuniterCommand::Start, _ => DuniterCommand::Start,
...@@ -103,6 +120,11 @@ impl DuniterServer { ...@@ -103,6 +120,11 @@ impl DuniterServer {
let threadpool = if home_path_opt.is_some() { let threadpool = if home_path_opt.is_some() {
log::info!("start dbs threadpool..."); log::info!("start dbs threadpool...");
#[cfg(test)]
let threadpool =
fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), dbs.clone());
#[cfg(not(test))]
let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), dbs); let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), dbs);
if command != DuniterCommand::Sync && conf.gva.is_some() { if command != DuniterCommand::Sync && conf.gva.is_some() {
...@@ -126,272 +148,30 @@ impl DuniterServer { ...@@ -126,272 +148,30 @@ impl DuniterServer {
}); });
} }
threadpool threadpool
} else {
cfg_if::cfg_if! {
if #[cfg(test)] {
fast_threadpool::ThreadPool::start(ThreadPoolConfig::low(), dbs.clone())
} else { } else {
fast_threadpool::ThreadPool::start(ThreadPoolConfig::low(), dbs) fast_threadpool::ThreadPool::start(ThreadPoolConfig::low(), dbs)
}
}
}; };
Ok(DuniterServer { let duniter_server = DuniterServer {
bc_db, bc_db,
conf, conf,
current, current,
dbs_pool: threadpool.into_sync_handler(), dbs_pool: threadpool.into_sync_handler(),
pending_txs_subscriber, pending_txs_subscriber,
txs_mempool, txs_mempool,
}) };
} cfg_if::cfg_if! {
if #[cfg(test)] {
/* Ok((duniter_server, dbs))
* READ FUNCTIONS FOR DUNITER JS ONLY
*/
pub fn get_self_endpoints(&self) -> anyhow::Result<Vec<Endpoint>> {
if let Some(self_peer) = self
.dbs_pool
.execute(|dbs| dbs.cm_db.self_peer_old().get(&()))?
.context("fail to get self endpoints")?
{
Ok(self_peer.endpoints)
} else {
Ok(vec![])
}
}
pub fn accept_new_tx(
&self,
tx: TransactionDocumentV10,
server_pubkey: PublicKey,
) -> KvResult<bool> {
let txs_mempool = self.txs_mempool;
match self
.dbs_pool
.execute(move |dbs| {
txs_mempool.accept_new_tx(&dbs.bc_db_ro, server_pubkey, tx, &dbs.txs_mp_db)
})
.expect("dbs pool discorrected")
{
Ok(()) => Ok(true),
Err(TxMpError::Db(e)) => Err(e),
Err(_) => Ok(false),
}
}
pub fn get_mempool_txs_free_rooms(&self) -> KvResult<usize> {
let txs_mempool = self.txs_mempool;
self.dbs_pool
.execute(move |dbs| txs_mempool.get_free_rooms(&dbs.txs_mp_db))
.expect("dbs pool discorrected")
}
pub fn get_new_pending_txs(&self) -> KvResult<Vec<TransactionDocumentV10>> {
let mut new_pending_txs = BTreeMap::new();
for events in self.pending_txs_subscriber.drain() {
use std::ops::Deref as _;
for event in events.deref() {
match event {
duniter_dbs::databases::txs_mp_v2::TxsEvent::Upsert { key, value } => {
new_pending_txs.insert(key.0, value.0.clone());
}
duniter_dbs::databases::txs_mp_v2::TxsEvent::Remove { key } => {
new_pending_txs.remove(&key.0);
}
_ => (),
}
}
}
Ok(new_pending_txs.into_iter().map(|(_k, v)| v).collect())
}
pub fn get_pending_txs(
&self,
_blockchain_time: i64,
min_version: usize,
) -> KvResult<Vec<PendingTxDbV2>> {
self.dbs_pool
.execute(move |dbs| {
dbs.txs_mp_db.txs().iter(.., |it| {
it.values()
.filter_ok(|tx| tx.0.version() >= min_version)
.collect()
})
})
.expect("dbs pool disconnected")
}
pub fn get_transactions_history(&self, pubkey: PublicKey) -> KvResult<TxsHistory> {
self.dbs_pool
.execute(move |dbs| {
duniter_gva_dbs_reader::txs_history::get_transactions_history_for_bma(
&dbs.gva_db,
&dbs.txs_mp_db,
pubkey,
)
})
.expect("dbs pool disconnected")
}
pub fn get_tx_by_hash(
&self,
hash: Hash,
) -> KvResult<Option<(TransactionDocumentV10, Option<BlockNumber>)>> {
self.dbs_pool
.execute(move |dbs| {
if let Some(tx) = dbs.txs_mp_db.txs().get(&HashKeyV2(hash))? {
Ok(Some((tx.0, None)))
} else if let Some(tx_db) = dbs.gva_db.txs().get(&HashKeyV2(hash))? {
Ok(Some((tx_db.tx, Some(tx_db.written_block.number))))
} else { } else {
Ok(None) Ok(duniter_server)
}
})
.expect("dbs pool disconnected")
}
/*
* WRITE FUNCTION FOR DUNITER JS ONLY
*/
pub fn add_pending_tx_force(&self, tx: TransactionDocumentV10) -> KvResult<()> {
let txs_mempool = self.txs_mempool;
self.dbs_pool
.execute(move |dbs| txs_mempool.add_pending_tx_force(&dbs.txs_mp_db, &tx))
.expect("dbs pool disconnected")
}
pub fn apply_block(&mut self, block: DubpBlockV10Stringified) -> KvResult<()> {
let block = Arc::new(
DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
self.current = Some(duniter_dbs_write_ops::apply_block::apply_block(
&self.bc_db,
block.clone(),
self.current,
&self.dbs_pool,
false,
)?);
apply_block_modules(block, &self.conf, &self.dbs_pool, None)
}
pub fn apply_chunk_of_blocks(&mut self, blocks: Vec<DubpBlockV10Stringified>) -> KvResult<()> {
log::debug!("apply_chunk(#{})", blocks[0].number);
let blocks = Arc::from(
blocks
.into_iter()
.map(|block| DubpBlockV10::from_string_object(&block))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
self.current = Some(duniter_dbs_write_ops::apply_block::apply_chunk(
&self.bc_db,
self.current,
&self.dbs_pool,
blocks.clone(),
)?);
apply_chunk_of_blocks_modules(blocks, &self.conf, &self.dbs_pool, None)
}
pub fn receive_new_heads(
&self,
heads: Vec<(duniter_dbs::DunpNodeIdV1Db, duniter_dbs::DunpHeadDbV1)>,
) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
for (dunp_node_id, dunp_head) in heads {
dbs.cm_db
.dunp_heads_old_write()
.upsert(dunp_node_id, dunp_head)?
}
Ok::<(), KvError>(())
})
.expect("dbs pool disconnected")
} }
pub fn remove_all_pending_txs(&self) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
duniter_dbs_write_ops::txs_mp::remove_all_pending_txs(&dbs.txs_mp_db)
})
.expect("dbs pool disconnected")
} }
pub fn remove_pending_tx_by_hash(&self, hash: Hash) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
duniter_dbs_write_ops::txs_mp::remove_pending_tx_by_hash(&dbs.txs_mp_db, hash)
})
.expect("dbs pool disconnected")
}
pub fn revert_block(&mut self, block: DubpBlockV10Stringified) -> KvResult<()> {
let block = Arc::new(
DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
let block_arc_clone = Arc::clone(&block);
let txs_mp_job_handle = self
.dbs_pool
.launch(move |dbs| {
duniter_dbs_write_ops::txs_mp::revert_block(
block_arc_clone.transactions(),
&dbs.txs_mp_db,
)
})
.expect("dbs pool disconnected");
self.current = duniter_dbs_write_ops::bc::revert_block(&self.bc_db, &block)?;
txs_mp_job_handle.join().expect("dbs pool disconnected")?;
revert_block_modules(block, &self.conf, &self.dbs_pool, None)
}
pub fn remove_all_peers(&self) -> KvResult<()> {
use duniter_dbs::databases::dunp_v1::DunpV1DbWritable as _;
self.dbs_pool
.execute(move |dbs| dbs.dunp_db.peers_old_write().clear())
.expect("dbs pool disconnected")
}
pub fn remove_peer_by_pubkey(&self, pubkey: PublicKey) -> KvResult<()> {
use duniter_dbs::databases::dunp_v1::DunpV1DbWritable as _;
self.dbs_pool
.execute(move |dbs| dbs.dunp_db.peers_old_write().remove(PubKeyKeyV2(pubkey)))
.expect("dbs pool disconnected")
}
pub fn save_peer(&self, new_peer_card: PeerCardStringified) -> anyhow::Result<()> {
use dubp::crypto::keys::PublicKey as _;
let pubkey = PublicKey::from_base58(&new_peer_card.pubkey)?;
use duniter_dbs::databases::dunp_v1::DunpV1DbWritable as _;
self.dbs_pool
.execute(move |dbs| {
dbs.dunp_db.peers_old_write().upsert(
PubKeyKeyV2(pubkey),
duniter_dbs::PeerCardDbV1 {
version: new_peer_card.version,
currency: new_peer_card.currency,
pubkey: new_peer_card.pubkey,
blockstamp: new_peer_card.blockstamp,
endpoints: new_peer_card.endpoints,
status: new_peer_card.status,
signature: new_peer_card.signature,
},
)
})
.expect("dbs pool disconnected")
.map_err(|e| e.into())
}
pub fn trim_expired_non_written_txs(&self, limit_time: i64) -> KvResult<()> {
self.dbs_pool
.execute(move |dbs| {
duniter_dbs_write_ops::txs_mp::trim_expired_non_written_txs(
&dbs.txs_mp_db,
limit_time,
)
})
.expect("dbs pool disconnected")
}
pub fn update_self_peer(&self, new_peer_card: PeerCardStringified) {
self.dbs_pool
.execute(move |dbs| {
dbs.cm_db
.self_peer_old_write()
.upsert(
(),
duniter_dbs::PeerCardDbV1 {
version: new_peer_card.version,
currency: new_peer_card.currency,
pubkey: new_peer_card.pubkey,
blockstamp: new_peer_card.blockstamp,
endpoints: new_peer_card.endpoints,
status: new_peer_card.status,
signature: new_peer_card.signature,
},
)
.expect("fail to write on memory db")
})
.expect("dbs pool disconnected")
} }
} }
...@@ -324,7 +324,14 @@ pub struct ServerMetaData { ...@@ -324,7 +324,14 @@ pub struct ServerMetaData {
} }
#[derive( #[derive(
async_graphql::SimpleObject, Clone, Debug, Default, serde::Deserialize, serde::Serialize, async_graphql::SimpleObject,
Clone,
Debug,
Default,
Eq,
PartialEq,
serde::Deserialize,
serde::Serialize,
)] )]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
#[graphql(name = "PeerCard")] #[graphql(name = "PeerCard")]
......
...@@ -57,7 +57,7 @@ impl Node { ...@@ -57,7 +57,7 @@ impl Node {
} }
/// Software /// Software
async fn software(&self) -> &'static str { async fn software(&self) -> &'static str {
"duniter" duniter_module::SOFTWARE_NAME
} }
/// Software version /// Software version
async fn version( async fn version(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment