Commit bbff027a authored by Éloïs's avatar Éloïs
Browse files

Merge branch 'ref/cm-to-global' into 'dev'

[feat] server: create global cache

See merge request nodes/typescript/duniter!1366
parents 8fb9eca8 6ce60364
......@@ -103,6 +103,9 @@ name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
dependencies = [
"serde",
]
[[package]]
name = "async-attributes"
......@@ -282,6 +285,16 @@ dependencies = [
"futures-micro",
]
[[package]]
name = "async-rwlock"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "261803dcc39ba9e72760ba6e16d0199b1eef9fc44e81bffabbebb9f5aea3906c"
dependencies = [
"async-mutex",
"event-listener",
]
[[package]]
name = "async-std"
version = "1.6.5"
......@@ -1154,12 +1167,14 @@ name = "duniter-bca"
version = "0.1.0"
dependencies = [
"anyhow",
"arrayvec",
"async-bincode",
"async_io_stream",
"bincode",
"dubp",
"duniter-bca-types",
"duniter-dbs",
"duniter-global",
"duniter-gva-db",
"duniter-gva-dbs-reader",
"duniter-mempools",
......@@ -1176,6 +1191,7 @@ dependencies = [
name = "duniter-bca-types"
version = "0.1.0"
dependencies = [
"arrayvec",
"bincode",
"dubp",
"serde",
......@@ -1249,13 +1265,28 @@ dependencies = [
"chrono",
"dubp",
"duniter-dbs",
"duniter-global",
"fast-threadpool",
"flume",
"log",
"maplit",
"resiter",
"serde_json",
]
[[package]]
name = "duniter-global"
version = "1.8.1"
dependencies = [
"async-rwlock",
"dubp",
"duniter-dbs",
"flume",
"mockall",
"once_cell",
"tokio",
]
[[package]]
name = "duniter-gva"
version = "0.1.0"
......@@ -1270,6 +1301,7 @@ dependencies = [
"duniter-bca",
"duniter-conf",
"duniter-dbs",
"duniter-global",
"duniter-gva-db",
"duniter-gva-dbs-reader",
"duniter-gva-gql",
......@@ -1315,6 +1347,7 @@ dependencies = [
"anyhow",
"arrayvec",
"dubp",
"duniter-bca-types",
"duniter-dbs",
"duniter-gva-db",
"maplit",
......@@ -1336,6 +1369,7 @@ dependencies = [
"duniter-bc-reader",
"duniter-conf",
"duniter-dbs",
"duniter-global",
"duniter-gva-db",
"duniter-gva-dbs-reader",
"duniter-mempools",
......@@ -1425,8 +1459,10 @@ dependencies = [
"dubp",
"duniter-conf",
"duniter-dbs",
"duniter-global",
"duniter-mempools",
"fast-threadpool",
"log",
"paste",
"tokio",
]
......@@ -1442,6 +1478,7 @@ dependencies = [
"duniter-conf",
"duniter-dbs",
"duniter-dbs-write-ops",
"duniter-global",
"duniter-gva",
"duniter-mempools",
"duniter-module",
......@@ -1450,7 +1487,6 @@ dependencies = [
"log",
"paste",
"resiter",
"tokio",
]
[[package]]
......
......@@ -41,6 +41,7 @@ members = [
"rust-libs/duniter-mempools",
"rust-libs/duniter-module",
"rust-libs/duniter-server",
"rust-libs/duniter-global",
"rust-libs/modules/gva",
"rust-libs/modules/gva/bca",
"rust-libs/modules/gva/bca/types",
......
......@@ -142,6 +142,7 @@ declare_types! {
let server = this.borrow(&guard);
server.server.get_self_endpoints()
}.map(|endpoints| {
log::info!("TMP DEBUG get_self_endpoints={:?}", endpoints);
let js_array = JsArray::new(&mut cx, endpoints.len() as u32);
for (i, ep) in endpoints.iter().enumerate() {
let js_string = cx.string(ep);
......
......@@ -119,7 +119,7 @@ fn migrate_inner(
})
.expect("gva:apply_chunk: dbs pool disconnected");
current = Some(duniter_dbs_write_ops::apply_block::apply_chunk(
bc_db, current, &dbs_pool, chunk,
bc_db, current, &dbs_pool, chunk, None,
)?);
gva_handle
.join()
......
......@@ -23,15 +23,7 @@
)]
use dubp::crypto::hashs::Hash;
use duniter_dbs::{
databases::{bc_v2::BcV2DbReadable, cm_v1::CmV1DbReadable},
kv_typed::prelude::*,
BlockMetaV2, HashKeyV2,
};
pub fn get_current_block_meta<CmDb: CmV1DbReadable>(cm_db: &CmDb) -> KvResult<Option<BlockMetaV2>> {
cm_db.current_block_meta().get(&())
}
use duniter_dbs::{databases::bc_v2::BcV2DbReadable, kv_typed::prelude::*, HashKeyV2};
pub fn tx_exist<BcDb: BcV2DbReadable>(bc_db_ro: &BcDb, hash: Hash) -> KvResult<bool> {
Ok(bc_db_ro.txs_hashs().contains_key(&HashKeyV2(hash))?)
......
......@@ -15,7 +15,9 @@ path = "src/lib.rs"
chrono = "0.4.19"
dubp = { version = "0.50.0", features = ["duniter"] }
duniter-dbs = { path = "../duniter-dbs" }
duniter-global = { path = "../duniter-global" }
fast-threadpool = "0.2.3"
flume = "0.10"
log = "0.4.11"
resiter = "0.4.0"
......
......@@ -20,11 +20,12 @@ pub fn apply_block(
block: Arc<DubpBlockV10>,
current_opt: Option<BlockMetaV2>,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>,
global_sender: &flume::Sender<GlobalBackGroundTaskMsg>,
throw_chainability: bool,
) -> KvResult<BlockMetaV2> {
if let Some(current) = current_opt {
if block.number().0 == current.number + 1 {
apply_block_inner(bc_db, dbs_pool, block)
apply_block_inner(bc_db, dbs_pool, block, global_sender)
} else if throw_chainability {
Err(KvError::Custom(
format!(
......@@ -38,7 +39,7 @@ pub fn apply_block(
Ok(current)
}
} else if block.number() == BlockNumber(0) {
apply_block_inner(bc_db, dbs_pool, block)
apply_block_inner(bc_db, dbs_pool, block, global_sender)
} else {
Err(KvError::Custom(
"Try to apply non genesis block on empty blockchain".into(),
......@@ -52,9 +53,10 @@ pub fn apply_chunk(
current_opt: Option<BlockMetaV2>,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>,
blocks: Arc<[DubpBlockV10]>,
global_sender: Option<&flume::Sender<GlobalBackGroundTaskMsg>>,
) -> KvResult<BlockMetaV2> {
verify_chunk_chainability(current_opt, &blocks)?;
apply_chunk_inner(bc_db, dbs_pool, blocks)
apply_chunk_inner(bc_db, dbs_pool, blocks, global_sender)
}
fn verify_chunk_chainability(
......@@ -105,17 +107,13 @@ fn apply_block_inner(
bc_db: &BcV2Db<FileBackend>,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>,
block: Arc<DubpBlockV10>,
global_sender: &flume::Sender<GlobalBackGroundTaskMsg>,
) -> KvResult<BlockMetaV2> {
let block_for_cm = Arc::clone(&block);
let block_for_txs_mp = Arc::clone(&block);
// Cm
let cm_handle = dbs_pool
.launch(move |dbs| {
crate::cm::apply_block(&block_for_cm, &dbs.cm_db)?;
Ok::<_, KvError>(())
})
.expect("dbs pool disconnected");
crate::cm::update_current_meta(&block_for_cm, &global_sender);
//TxsMp
let txs_mp_handle = dbs_pool
......@@ -128,7 +126,6 @@ fn apply_block_inner(
// Bc
let new_current = crate::bc::apply_block(bc_db, &block)?;
cm_handle.join().expect("dbs pool disconnected")?;
txs_mp_handle.join().expect("dbs pool disconnected")?;
Ok(new_current)
......@@ -138,18 +135,16 @@ fn apply_chunk_inner(
bc_db: &BcV2Db<FileBackend>,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<SharedDbs<FileBackend>>,
blocks: Arc<[DubpBlockV10]>,
global_sender: Option<&flume::Sender<GlobalBackGroundTaskMsg>>,
) -> KvResult<BlockMetaV2> {
let blocks_len = blocks.len();
let blocks_for_cm = Arc::clone(&blocks);
let blocks_for_txs_mp = Arc::clone(&blocks);
// Cm
let cm_handle = dbs_pool
.launch(move |dbs| {
let chunk_len = blocks_for_cm.len();
crate::cm::apply_block(&blocks_for_cm.deref()[chunk_len - 1], &dbs.cm_db)
})
.expect("dbs pool disconnected");
if let Some(global_sender) = global_sender {
let chunk_len = blocks.len();
crate::cm::update_current_meta(&&blocks.deref()[chunk_len - 1], &global_sender);
}
//TxsMp
//log::info!("apply_chunk: launch txs_mp job...");
......@@ -169,7 +164,6 @@ fn apply_chunk_inner(
}
let current_block = crate::bc::apply_block(bc_db, &blocks[blocks_len - 1])?;
cm_handle.join().expect("dbs pool disconnected")?;
txs_mp_handle
.join()
.expect("txs_mp_recv: dbs pool disconnected")?;
......
......@@ -14,27 +14,38 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use duniter_dbs::databases::bc_v2::BcV2DbReadable;
use duniter_dbs::BlockDbV2;
pub fn init(bc_db: &BcV2Db<FileBackend>, cm_db: &CmV1Db<MemSingleton>) -> KvResult<()> {
if let Some(current_block_meta) = bc_db
.blocks_meta()
.iter_rev(.., |it| it.values().next_res())?
{
cm_db
.current_block_meta_write()
.upsert((), current_block_meta)
} else {
Ok(())
}
#[inline(always)]
pub(crate) fn update_current_meta(
block: &DubpBlockV10,
global_sender: &flume::Sender<GlobalBackGroundTaskMsg>,
) {
let current_block_meta = block_to_block_meta(block);
global_sender
.send(GlobalBackGroundTaskMsg::NewCurrentBlock(current_block_meta))
.expect("global task disconnected");
}
pub fn apply_block(block: &DubpBlockV10, cm_db: &CmV1Db<MemSingleton>) -> KvResult<()> {
let block_meta = BlockMetaV2::from(block);
cm_db.current_block_meta_write().upsert((), block_meta)?;
cm_db
.current_block_write()
.upsert((), BlockDbV2(block.clone()))?;
Ok(())
fn block_to_block_meta(block: &DubpBlockV10) -> BlockMetaV2 {
BlockMetaV2 {
version: 10,
number: block.number().0,
hash: block.hash().0,
signature: block.signature(),
inner_hash: block.inner_hash(),
previous_hash: block.previous_hash(),
issuer: block.issuer(),
previous_issuer: dubp::crypto::keys::ed25519::PublicKey::default(),
time: block.local_time(),
pow_min: block.pow_min() as u32,
members_count: block.members_count() as u64,
issuers_count: block.issuers_count() as u32,
issuers_frame: block.issuers_frame() as u64,
issuers_frame_var: 0,
median_time: block.common_time(),
nonce: block.nonce(),
monetary_mass: block.monetary_mass(),
unit_base: block.unit_base() as u32,
dividend: block.dividend(),
}
}
......@@ -40,13 +40,13 @@ use dubp::wallet::prelude::*;
use duniter_dbs::{
databases::{
bc_v2::BcV2Db,
cm_v1::{CmV1Db, CmV1DbWritable},
txs_mp_v2::{TxsMpV2Db, TxsMpV2DbReadable, TxsMpV2DbWritable},
},
kv_typed::prelude::*,
BlockMetaV2, FileBackend, HashKeyV2, PendingTxDbV2, PubKeyKeyV2, PubKeyValV2, SharedDbs,
SourceAmountValV2, UtxoValV2, WalletConditionsV2,
};
use duniter_global::GlobalBackGroundTaskMsg;
use resiter::filter_map::FilterMap;
use resiter::flatten::Flatten;
use resiter::map::Map;
......
......@@ -15,11 +15,4 @@
use crate::*;
db_schema!(
CmV1,
[
["self_peer_old", SelfPeerOld, (), PeerCardDbV1],
["current_block_meta", CurrentBlockMeta, (), BlockMetaV2],
["current_block", CurrentBlock, (), BlockDbV2],
]
);
db_schema!(CmV1, [["current_block", CurrentBlock, (), BlockDbV2],]);
[package]
name = "duniter-global"
version = "1.8.1"
authors = ["librelois <elois@duniter.org>"]
license = "AGPL-3.0"
edition = "2018"
[dependencies]
async-rwlock = "1.3.0"
dubp = { version = "0.50.0", features = ["duniter"] }
duniter-dbs = { path = "../duniter-dbs" }
flume = "0.10"
mockall = { version = "0.9", optional = true }
once_cell = "1.5"
tokio = { version = "1.2", features = ["io-util", "rt-multi-thread"] }
[features]
mock = ["mockall"]
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![deny(
clippy::unwrap_used,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unstable_features,
unused_import_braces
)]
pub use tokio;
use async_rwlock::RwLock;
use dubp::wallet::prelude::SourceAmount;
use duniter_dbs::BlockMetaV2;
use once_cell::sync::OnceCell;
use std::ops::Deref;
pub static SELF_ENDPOINTS: RwLock<Option<Vec<String>>> = RwLock::new(None);
static ASYNC_RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
static CURRENT_META: RwLock<Option<CurrentMeta>> = RwLock::new(None);
static SELF_PEER_OLD: RwLock<Option<duniter_dbs::PeerCardDbV1>> = RwLock::new(None);
#[derive(Clone, Copy, Debug, Default)]
pub struct CurrentMeta {
pub current_ud: SourceAmount,
pub current_block_meta: BlockMetaV2,
}
#[derive(Clone, Debug)]
pub enum GlobalBackGroundTaskMsg {
InitCurrentMeta(CurrentMeta),
NewCurrentBlock(BlockMetaV2),
GetSelfEndpoints(flume::Sender<Option<Vec<String>>>),
SetSelfPeerOld(duniter_dbs::PeerCardDbV1),
}
pub async fn start_global_background_task(recv: flume::Receiver<GlobalBackGroundTaskMsg>) {
tokio::spawn(async move {
while let Ok(msg) = recv.recv_async().await {
match msg {
GlobalBackGroundTaskMsg::InitCurrentMeta(current_meta) => {
let mut write_guard = CURRENT_META.write().await;
write_guard.replace(current_meta);
}
GlobalBackGroundTaskMsg::NewCurrentBlock(current_block_meta) => {
let upgradable_read_guard = CURRENT_META.upgradable_read().await;
let new_current_meta = if let Some(dividend) = current_block_meta.dividend {
CurrentMeta {
current_ud: dividend,
current_block_meta,
}
} else if let Some(current_meta) = upgradable_read_guard.deref() {
CurrentMeta {
current_ud: current_meta.current_ud,
current_block_meta,
}
} else {
CurrentMeta {
current_ud: SourceAmount::ZERO,
current_block_meta,
}
};
let mut write_guard =
async_rwlock::RwLockUpgradableReadGuard::upgrade(upgradable_read_guard)
.await;
write_guard.replace(new_current_meta);
}
GlobalBackGroundTaskMsg::GetSelfEndpoints(sender) => {
let read_guard = SELF_ENDPOINTS.read().await;
let _ = sender.send_async(read_guard.deref().clone()).await;
}
GlobalBackGroundTaskMsg::SetSelfPeerOld(self_peer_old) => {
let mut write_guard = SELF_PEER_OLD.write().await;
write_guard.replace(self_peer_old);
}
}
}
});
}
pub fn get_async_runtime() -> &'static tokio::runtime::Runtime {
ASYNC_RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("fail to build tokio runtime")
})
}
#[derive(Clone, Copy, Debug, Default)]
pub struct AsyncAccessor;
impl AsyncAccessor {
pub fn new() -> Self {
AsyncAccessor
}
pub async fn get_current_meta<D: 'static, F: 'static + FnOnce(&CurrentMeta) -> D>(
&self,
f: F,
) -> Option<D> {
let read_guard = CURRENT_META.read().await;
if let Some(current_meta) = read_guard.deref() {
Some(f(current_meta))
} else {
None
}
}
pub async fn get_self_peer_old<
D: 'static,
F: 'static + FnOnce(&duniter_dbs::PeerCardDbV1) -> D,
>(
&self,
f: F,
) -> Option<D> {
let read_guard = SELF_PEER_OLD.read().await;
if let Some(self_peer_old) = read_guard.deref() {
Some(f(self_peer_old))
} else {
None
}
}
}
#[cfg(feature = "mock")]
mockall::mock! {
pub AsyncAccessor {
pub async fn get_current_meta<D: 'static, F: 'static + FnOnce(&CurrentMeta) -> D>(
&self,
f: F,
) -> Option<D>;
pub async fn get_self_peer_old<
D: 'static,
F: 'static + FnOnce(&duniter_dbs::PeerCardDbV1) -> D,
>(
&self,
f: F,
) -> Option<D>;
}
}
......@@ -11,8 +11,10 @@ async-trait = "0.1.41"
dubp = { version = "0.50.0", features = ["duniter"] }
duniter-conf = { path = "../duniter-conf" }
duniter-dbs = { path = "../duniter-dbs" }
duniter-global = { path = "../duniter-global" }
duniter-mempools = { path = "../duniter-mempools" }
fast-threadpool = "0.2.3"
log = "0.4"
[dev-dependencies]
duniter-dbs = { path = "../duniter-dbs", features = ["mem"] }
......
......@@ -216,16 +216,8 @@ macro_rules! plug_duniter_modules {
all_endpoints.append(&mut endpoints);
)*
let self_peer = duniter_dbs::PeerCardDbV1 {
version: 10,
currency,
endpoints: all_endpoints,
..Default::default()
};
use duniter_dbs::databases::cm_v1::CmV1DbWritable as _;
use duniter_dbs::kv_typed::prelude::DbCollectionRw as _;
dbs_pool.execute(|dbs| dbs.cm_db.self_peer_old_write().upsert((), self_peer)).await?.context("fail to save self peer card")?;
log::info!("TMP DEBUG SELF_ENDPOINTS={:?}", all_endpoints);
duniter_global::SELF_ENDPOINTS.write().await.replace(all_endpoints);
$(
let [<$M:snake _handle>] = tokio::spawn([<$M:snake>].start());
......
......@@ -10,9 +10,10 @@ anyhow = "1.0.34"
cfg-if = "1.0.0"
dubp = { version = "0.50.0", features = ["duniter"] }
duniter-conf = { path = "../duniter-conf" }
duniter-dbs = { path = "../duniter-dbs" }
duniter-bc-reader = { path = "../duniter-bc-reader" }
duniter-dbs = { path = "../duniter-dbs" }
duniter-dbs-write-ops = { path = "../duniter-dbs-write-ops" }
duniter-global = { path = "../duniter-global" }
duniter-gva = { path = "../modules/gva", optional = true }
duniter-mempools = { path = "../duniter-mempools" }
duniter-module = { path = "../duniter-module" }
......@@ -21,7 +22,6 @@ flume = "0.10.0"
log = "0.4.11"
paste = "1.0.2"
resiter = "0.4.0"
tokio = { version = "1.2", features = ["io-util", "rt-multi-thread"] }
[features]
default = ["gva"]
......
// Copyright (C) 2020 Éloïs SANCHEZ.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::*;
use dubp::wallet::prelude::SourceAmount;
use duniter_dbs::databases::bc_v2::BcV2DbReadable;
use duniter_global::{CurrentMeta, GlobalBackGroundTaskMsg};
pub