Skip to content
Snippets Groups Projects
Commit 635a2cc5 authored by Éloïs's avatar Éloïs
Browse files

[ref] gva: move write ops specific to gva in gva folder

parent 447c1b94
No related branches found
No related tags found
1 merge request!1335Gva proto 2
Showing
with 275 additions and 223 deletions
......@@ -1070,6 +1070,7 @@ dependencies = [
"dubp",
"duniter-dbs",
"duniter-dbs-write-ops",
"duniter-gva-db-writer",
"fast-threadpool",
"rayon",
"serde",
......@@ -1141,6 +1142,7 @@ dependencies = [
"duniter-conf",
"duniter-dbs",
"duniter-dbs-read-ops",
"duniter-gva-db-writer",
"duniter-gva-dbs-reader",
"duniter-mempools",
"duniter-module",
......@@ -1159,6 +1161,17 @@ dependencies = [
"warp",
]
[[package]]
name = "duniter-gva-db-writer"
version = "0.1.0"
dependencies = [
"anyhow",
"dubp",
"duniter-dbs",
"resiter",
"smallvec",
]
[[package]]
name = "duniter-gva-dbs-reader"
version = "0.1.0"
......@@ -1346,9 +1359,9 @@ checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fast-threadpool"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9f76ead92b1c1e372f552dcd2e331fb321d6b16886df58ed7586208e211e14"
checksum = "f7b7af2f4b094190e85f8ba81ac02159ca91edc5aacd490eebfcc3f53b444e11"
dependencies = [
"async-oneshot",
"flume",
......
......@@ -43,6 +43,7 @@ members = [
"rust-libs/duniter-server",
"rust-libs/modules/gva",
"rust-libs/modules/gva/dbs-reader",
"rust-libs/modules/gva/db-writer",
"rust-libs/tests/duniter-integration-tests",
"rust-libs/tools/kv_typed"
]
......
......@@ -25,7 +25,8 @@ dirs = "3.0.1"
dubp = { version = "0.32.2" }
duniter-dbs = { path = "../../rust-libs/duniter-dbs", default-features = false, features = ["explorer", "leveldb_backend", "sled_backend"] }
duniter-dbs-write-ops = { path = "../../rust-libs/duniter-dbs-write-ops", default-features = false, features = ["explorer", "leveldb_backend", "sled_backend"] }
fast-threadpool = "0.2.1"
duniter-gva-db-writer = { path = "../../rust-libs/modules/gva/db-writer" }
fast-threadpool = "0.2.2"
rayon = "1.3.1"
serde_json = "1.0.53"
structopt = "0.3.16"
......
......@@ -18,9 +18,9 @@ use dubp::{
block::parser::parse_json_block_from_serde_value, block::parser::ParseJsonBlockError,
block::prelude::DubpBlockTrait, block::DubpBlock, common::prelude::BlockNumber,
};
use duniter_dbs::BcV1DbReadable;
use duniter_dbs::{BcV1DbReadable, FileBackend};
use fast_threadpool::{ThreadPool, ThreadPoolConfig};
use std::path::PathBuf;
use std::{ops::Deref, path::PathBuf};
const CHUNK_SIZE: usize = 250;
......@@ -32,6 +32,22 @@ pub(crate) fn migrate(profile_path: PathBuf) -> anyhow::Result<()> {
dbs.bc_db.clear()?;
dbs.gva_db.clear()?;
if let Err(e) = migrate_inner(dbs.clone(), profile_path, start_time) {
// Clear bc_db and gva_db
dbs.bc_db.clear()?;
dbs.gva_db.clear()?;
Err(e)
} else {
Ok(())
}
}
fn migrate_inner(
dbs: DuniterDbs<FileBackend>,
profile_path: PathBuf,
start_time: Instant,
) -> anyhow::Result<()> {
let data_path = profile_path.join(crate::DATA_DIR);
let duniter_js_db = BcV1Db::<LevelDb>::open(LevelDbConf {
db_path: data_path.as_path().join("leveldb"),
......@@ -94,9 +110,22 @@ pub(crate) fn migrate(profile_path: PathBuf) -> anyhow::Result<()> {
chunk[0].number(),
chunk[chunk.len() - 1].number()
);
let chunk = Arc::from(chunk);
let chunk_arc_clone = Arc::clone(&chunk);
let gva_handle = dbs_pool
.launch(move |dbs| {
for block in chunk_arc_clone.deref() {
duniter_gva_db_writer::apply_block(block, &dbs.gva_db)?;
}
Ok::<_, KvError>(())
})
.expect("gva:apply_chunk: dbs pool disconnected");
current = Some(duniter_dbs_write_ops::apply_block::apply_chunk(
current, &dbs_pool, chunk, true,
current, &dbs_pool, chunk,
)?);
gva_handle
.join()
.expect("gva:apply_chunk: dbs pool disconnected")?;
}
}
......
......@@ -15,7 +15,7 @@ path = "src/lib.rs"
chrono = "0.4.19"
dubp = { version = "0.32.2" }
duniter-dbs = { path = "../duniter-dbs" }
fast-threadpool = "0.2.1"
fast-threadpool = "0.2.2"
log = "0.4.11"
resiter = "0.4.0"
......
......@@ -16,15 +16,14 @@
use crate::*;
pub fn apply_block(
block: DubpBlockV10,
block: Arc<DubpBlockV10>,
current_opt: Option<BlockMetaV2>,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
gva: bool,
throw_chainability: bool,
) -> KvResult<BlockMetaV2> {
if let Some(current) = current_opt {
if block.number().0 == current.number + 1 {
apply_block_inner(dbs_pool, Arc::new(block), gva)
apply_block_inner(dbs_pool, block)
} else if throw_chainability {
Err(KvError::Custom(
format!(
......@@ -38,7 +37,7 @@ pub fn apply_block(
Ok(current)
}
} else if block.number() == BlockNumber(0) {
apply_block_inner(dbs_pool, Arc::new(block), gva)
apply_block_inner(dbs_pool, block)
} else {
Err(KvError::Custom(
"Try to apply non genesis block on empty blockchain".into(),
......@@ -50,11 +49,10 @@ pub fn apply_block(
pub fn apply_chunk(
current_opt: Option<BlockMetaV2>,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
blocks: Vec<DubpBlockV10>,
gva: bool,
blocks: Arc<[DubpBlockV10]>,
) -> KvResult<BlockMetaV2> {
verify_chunk_chainability(current_opt, &blocks)?;
apply_chunk_inner(dbs_pool, Arc::new(blocks), gva)
apply_chunk_inner(dbs_pool, blocks)
}
fn verify_chunk_chainability(
......@@ -104,7 +102,6 @@ fn verify_chunk_chainability(
fn apply_block_inner(
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
block: Arc<DubpBlockV10>,
gva: bool,
) -> KvResult<BlockMetaV2> {
// Bc
let block_arc = Arc::clone(&block);
......@@ -119,24 +116,14 @@ fn apply_block_inner(
Ok::<_, KvError>(())
})
.expect("dbs pool disconnected");
// Gva
if gva {
let block_arc = Arc::clone(&block);
dbs_pool
.execute(move |dbs| {
crate::gva::apply_block(&block_arc, &dbs.gva_db)?;
Ok::<_, KvError>(())
})
.expect("dbs pool disconnected")?;
}
txs_mp_recv.join().expect("dbs pool disconnected")?;
bc_recv.join().expect("dbs pool disconnected")
}
fn apply_chunk_inner(
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
blocks: Arc<Vec<DubpBlockV10>>,
gva: bool,
blocks: Arc<[DubpBlockV10]>,
) -> KvResult<BlockMetaV2> {
// Bc
let blocks_len = blocks.len();
......@@ -161,20 +148,6 @@ fn apply_chunk_inner(
Ok::<_, KvError>(())
})
.expect("apply_chunk_inner:txs_mp: dbs pool disconnected");
// Gva
if gva {
let blocks_arc = Arc::clone(&blocks);
//log::info!("apply_chunk: launch gva job...");
dbs_pool
.execute(move |dbs| {
for block in blocks_arc.deref() {
crate::gva::apply_block(&block, &dbs.gva_db)?;
}
Ok::<_, KvError>(())
})
.expect("apply_chunk_inner:gva: dbs pool disconnected")?;
//log::info!("apply_chunk: gva job finish.");
}
txs_mp_handle
.join()
.expect("txs_mp_recv: dbs pool disconnected")?;
......
......@@ -85,7 +85,7 @@ pub fn apply_block<B: Backend>(
pub fn revert_block<B: Backend>(
bc_db: &duniter_dbs::bc_v2::BcV2Db<B>,
block: DubpBlockV10,
block: &DubpBlockV10,
) -> KvResult<Option<BlockMetaV2>> {
(
bc_db.blocks_meta_write(),
......
......@@ -24,7 +24,6 @@
pub mod apply_block;
pub mod bc;
pub mod gva;
pub mod txs_mp;
use std::borrow::Cow;
......@@ -37,157 +36,12 @@ use dubp::documents::{
transaction::TransactionDocumentV10,
};
use dubp::wallet::prelude::*;
use duniter_dbs::gva_v1::{TxsByIssuerEvent, TxsByRecipientEvent, TxsEvent};
use duniter_dbs::{
kv_typed::prelude::*, BlockMetaV2, DuniterDbs, FileBackend, GvaV1Db, GvaV1DbReadable,
GvaV1DbWritable, HashKeyV2, PendingTxDbV2, PubKeyKeyV2, PubKeyValV2, SourceAmountValV2, TxDbV2,
TxsMpV2Db, TxsMpV2DbReadable, TxsMpV2DbWritable, UtxoValV2, WalletConditionsV2,
kv_typed::prelude::*, BlockMetaV2, DuniterDbs, FileBackend, HashKeyV2, PendingTxDbV2,
PubKeyKeyV2, PubKeyValV2, SourceAmountValV2, TxsMpV2Db, TxsMpV2DbReadable, TxsMpV2DbWritable,
UtxoValV2, WalletConditionsV2,
};
use resiter::filter::Filter;
use resiter::filter_map::FilterMap;
use resiter::flatten::Flatten;
use resiter::map::Map;
use std::{collections::HashMap, ops::Deref};
pub struct UtxoV10<'s> {
pub id: UtxoIdV10,
pub amount: SourceAmount,
pub script: &'s WalletScriptV10,
pub written_block: BlockNumber,
}
#[cfg(test)]
mod tests {
use super::*;
use dubp::{
documents::transaction::TransactionDocumentV10Stringified,
documents_parser::prelude::FromStringObject,
};
#[test]
#[ignore]
fn tmp_apply_block_real() -> KvResult<()> {
let gva_db = GvaV1Db::<Sled>::open(
SledConf::default()
.path("/home/elois/.config/duniter/s2/data/gva_v1_sled")
.flush_every_ms(None),
)?;
/*let txs_mp_db = TxsMpV2Db::<Sled>::open(
SledConf::default()
.path("/home/elois/.config/duniter/s2/data/txs_mp_v2_sled")
.flush_every_ms(None),
)?;*/
let txs: Vec<TransactionDocumentV10Stringified> = serde_json::from_str(r#"[
{
"version": 10,
"currency": "g1",
"comment": ". je me sens plus legere mm si....reste le bon toit a trouver dans un temps record ! Merci pour cet eclairage fort",
"locktime": 0,
"signatures": [
"8t5vo+k5OvkyAd+L+J8g6MLpp/AP0qOQFcJvf+OPMEZaVnHH38YtCigo64unU9aCsb9zZc6UEc78ZrkQ/E2TCg=="
],
"outputs": [
"5000:0:SIG(5VYg9YHvLQuoky7EPyyk3cEfBUtB1GuAeJ6SiJ6c9wWe)",
"55:0:SIG(Ceq5Y6W5kjFkPrvcx5oAgugLMTwcEXyWgfn3P85TSj7x)"
],
"inputs": [
"1011:0:D:Ceq5Y6W5kjFkPrvcx5oAgugLMTwcEXyWgfn3P85TSj7x:296658",
"1011:0:D:Ceq5Y6W5kjFkPrvcx5oAgugLMTwcEXyWgfn3P85TSj7x:296936",
"1011:0:D:Ceq5Y6W5kjFkPrvcx5oAgugLMTwcEXyWgfn3P85TSj7x:297211",
"1011:0:D:Ceq5Y6W5kjFkPrvcx5oAgugLMTwcEXyWgfn3P85TSj7x:297489",
"1011:0:D:Ceq5Y6W5kjFkPrvcx5oAgugLMTwcEXyWgfn3P85TSj7x:297786"
],
"unlocks": [
"0:SIG(0)",
"1:SIG(0)",
"2:SIG(0)",
"3:SIG(0)",
"4:SIG(0)"
],
"blockstamp": "304284-000003F738B9A5FC8F5D04B4B9746FD899B3A49367099BB2796E7EF976DCDABB",
"blockstampTime": 0,
"issuers": [
"Ceq5Y6W5kjFkPrvcx5oAgugLMTwcEXyWgfn3P85TSj7x"
],
"block_number": 0,
"time": 0
},
{
"version": 10,
"currency": "g1",
"comment": "Pour les places de cine et l expedition ..Merci",
"locktime": 0,
"signatures": [
"VhzwAwsCr30XnetveS74QD2kJMYCQ89VZvyUBJM9DP/kd5KBqkF1c1HcKpJdHrfu2oq3JbSEIhEf/aLgnEdSCw=="
],
"outputs": [
"6000:0:SIG(jUPLL2BgY2QpheWEY3R13edV2Y4tvQMCXjJVM8PGDvyd)",
"10347:0:SIG(2CWxxkttvkGSUVZdaUZHiksNisDC3wJx32Y2NVAyeHez)"
],
"inputs": [
"347:0:T:4EA4D01422469ABA380F48A48254EB3F15606C12FE4CFF7E7D6EEB1FD9752DDB:1",
"16000:0:T:9A4DA56EF5F9B50D612D806BAE0886EB3033B4F166D2E96498DE16B83F39B59D:0"
],
"unlocks": [
"0:SIG(0)",
"1:SIG(0)"
],
"blockstamp": "304284-000003F738B9A5FC8F5D04B4B9746FD899B3A49367099BB2796E7EF976DCDABB",
"blockstampTime": 0,
"issuers": [
"2CWxxkttvkGSUVZdaUZHiksNisDC3wJx32Y2NVAyeHez"
],
"block_number": 0,
"time": 0
},
{
"version": 10,
"currency": "g1",
"comment": "POur le sac a tarte merci",
"locktime": 0,
"signatures": [
"721K4f+F9PgksoVDZgQTURJIO/DZUhQfAzXfBvYrFkgqHNNeBbcgGecFX63rPYjFvau+qg1Hmi0coL9z7r7EAQ=="
],
"outputs": [
"15000:0:SIG(KxyNK1k55PEA8eBjX1K4dLJr35gC2dwMwNFPHwvZFH4)",
"17668:0:SIG(4VQvVLT1R6upLuRk85A5eWTowqJwvkSMGQQZ9Hc4bqLg)"
],
"inputs": [
"1011:0:D:4VQvVLT1R6upLuRk85A5eWTowqJwvkSMGQQZ9Hc4bqLg:303924",
"1011:0:D:4VQvVLT1R6upLuRk85A5eWTowqJwvkSMGQQZ9Hc4bqLg:304212",
"10458:0:T:55113E18AB61603AD0FC24CD11ACBC96F9583FD0A5877055F17315E9613BBF7D:1",
"20188:0:T:937A0454C1A63B383FBB6D219B9312B0A36DFE19DA08076BD113F9D5D4FC903D:1"
],
"unlocks": [
"0:SIG(0)",
"1:SIG(0)",
"2:SIG(0)",
"3:SIG(0)"
],
"blockstamp": "304284-000003F738B9A5FC8F5D04B4B9746FD899B3A49367099BB2796E7EF976DCDABB",
"blockstampTime": 0,
"issuers": [
"4VQvVLT1R6upLuRk85A5eWTowqJwvkSMGQQZ9Hc4bqLg"
],
"block_number": 0,
"time": 0
}
]"#).expect("wrong tx");
let block = DubpBlockV10Stringified {
number: 304286,
hash: Some(
"000001339AECF3CAB78B2B61776FB3819B800AB43923F4F8BD0F5AE47B7DEAB9".to_owned(),
),
median_time: 1583862823,
transactions: txs,
..Default::default()
};
let block = DubpBlockV10::from_string_object(&block).expect("fail to parse block");
gva::apply_block(&block, &gva_db)?;
Ok(())
}
}
use std::ops::Deref;
......@@ -12,7 +12,7 @@ dubp = { version = "0.32.2" }
duniter-conf = { path = "../duniter-conf" }
duniter-dbs = { path = "../duniter-dbs" }
duniter-mempools = { path = "../duniter-mempools" }
fast-threadpool = "0.2.1"
fast-threadpool = "0.2.2"
[dev-dependencies]
duniter-dbs = { path = "../duniter-dbs", features = ["mem"] }
......
......@@ -22,15 +22,44 @@
unused_import_braces
)]
use dubp::block::DubpBlockV10;
use duniter_conf::DuniterConf;
use duniter_dbs::{DuniterDbs, FileBackend};
use duniter_dbs::{kv_typed::prelude::*, DuniterDbs, FileBackend};
use duniter_mempools::Mempools;
use fast_threadpool::{JoinHandle, ThreadPoolDisconnected};
use std::path::Path;
pub type Endpoint = String;
#[async_trait::async_trait]
pub trait DuniterModule: 'static + Sized {
fn apply_block(
_block: Arc<DubpBlockV10>,
_conf: &duniter_conf::DuniterConf,
_dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
_profile_path_opt: Option<&Path>,
) -> Result<Option<JoinHandle<KvResult<()>>>, ThreadPoolDisconnected> {
Ok(None)
}
fn apply_chunk_of_blocks(
_blocks: Arc<[DubpBlockV10]>,
_conf: &duniter_conf::DuniterConf,
_dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
_profile_path_opt: Option<&Path>,
) -> Result<Option<JoinHandle<KvResult<()>>>, ThreadPoolDisconnected> {
Ok(None)
}
fn revert_block(
_block: Arc<DubpBlockV10>,
_conf: &duniter_conf::DuniterConf,
_dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
_profile_path_opt: Option<&Path>,
) -> Result<Option<JoinHandle<KvResult<()>>>, ThreadPoolDisconnected> {
Ok(None)
}
fn init(
conf: &DuniterConf,
currency: &str,
......@@ -48,6 +77,57 @@ macro_rules! plug_duniter_modules {
([$($M:ty),*]) => {
paste::paste! {
use anyhow::Context as _;
#[allow(dead_code)]
fn apply_block_modules(
block: Arc<DubpBlockV10>,
conf: &duniter_conf::DuniterConf,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
profile_path_opt: Option<&Path>,
) -> KvResult<()> {
$(
let [<$M:snake>] = <$M>::apply_block(block.clone(), conf, dbs_pool, profile_path_opt).expect("thread pool disconnected");
)*
$(
if let Some(join_handle) = [<$M:snake>] {
join_handle.join().expect("thread pool disconnected")?;
}
)*
Ok(())
}
#[allow(dead_code)]
fn apply_chunk_of_blocks_modules(
blocks: Arc<[DubpBlockV10]>,
conf: &duniter_conf::DuniterConf,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
profile_path_opt: Option<&Path>,
) -> KvResult<()> {
$(
let [<$M:snake>] = <$M>::apply_chunk_of_blocks(blocks.clone(), conf, dbs_pool, profile_path_opt).expect("thread pool disconnected");
)*
$(
if let Some(join_handle) = [<$M:snake>] {
join_handle.join().expect("thread pool disconnected")?;
}
)*
Ok(())
}
#[allow(dead_code)]
fn revert_block_modules(
block: Arc<DubpBlockV10>,
conf: &duniter_conf::DuniterConf,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
profile_path_opt: Option<&Path>,
) -> KvResult<()> {
$(
let [<$M:snake>] = <$M>::revert_block(block.clone(), conf, dbs_pool, profile_path_opt).expect("thread pool disconnected");
)*
$(
if let Some(join_handle) = [<$M:snake>] {
join_handle.join().expect("thread pool disconnected")?;
}
)*
Ok(())
}
async fn start_duniter_modules(
conf: &DuniterConf,
currency: String,
......
......@@ -16,7 +16,7 @@ duniter-gva = { path = "../modules/gva" }
duniter-gva-dbs-reader = { path = "../modules/gva/dbs-reader" }
duniter-mempools = { path = "../duniter-mempools" }
duniter-module = { path = "../duniter-module" }
fast-threadpool = "0.2.1"
fast-threadpool = "0.2.2"
flume = "0.9.1"
log = "0.4.11"
paste = "1.0.2"
......
......@@ -257,49 +257,51 @@ impl DuniterServer {
.expect("dbs pool disconnected")
}
pub fn revert_block(&mut self, block: DubpBlockV10Stringified) -> KvResult<()> {
let gva = self.conf.gva.is_some();
let block = DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?;
let block = Arc::new(
DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
let block_arc_clone = Arc::clone(&block);
self.current = self
.dbs_pool
.execute(move |dbs| {
duniter_dbs_write_ops::txs_mp::revert_block(block.transactions(), &dbs.txs_mp_db)?;
if gva {
duniter_dbs_write_ops::gva::revert_block(&block, &dbs.gva_db)?;
}
duniter_dbs_write_ops::bc::revert_block(&dbs.bc_db, block)
duniter_dbs_write_ops::txs_mp::revert_block(
block_arc_clone.transactions(),
&dbs.txs_mp_db,
)?;
duniter_dbs_write_ops::bc::revert_block(&dbs.bc_db, &block_arc_clone)
})
.expect("dbs pool disconnected")?;
Ok(())
revert_block_modules(block, &self.conf, &self.dbs_pool, None)
}
pub fn apply_block(&mut self, block: DubpBlockV10Stringified) -> KvResult<()> {
let gva = self.conf.gva.is_some();
let block = DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?;
let block = Arc::new(
DubpBlockV10::from_string_object(&block)
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
self.current = Some(duniter_dbs_write_ops::apply_block::apply_block(
block,
block.clone(),
self.current,
&self.dbs_pool,
gva,
false,
)?);
Ok(())
apply_block_modules(block, &self.conf, &self.dbs_pool, None)
}
pub fn apply_chunk_of_blocks(&mut self, blocks: Vec<DubpBlockV10Stringified>) -> KvResult<()> {
log::debug!("apply_chunk(#{})", blocks[0].number);
let gva = self.conf.gva.is_some();
let blocks = blocks
let blocks = Arc::from(
blocks
.into_iter()
.map(|block| DubpBlockV10::from_string_object(&block))
.collect::<Result<Vec<_>, _>>()
.map_err(|e| KvError::DeserError(format!("{}", e)))?;
.map_err(|e| KvError::DeserError(format!("{}", e)))?,
);
self.current = Some(duniter_dbs_write_ops::apply_block::apply_chunk(
self.current,
&self.dbs_pool,
blocks,
gva,
blocks.clone(),
)?);
Ok(())
apply_chunk_of_blocks_modules(blocks, &self.conf, &self.dbs_pool, None)
}
pub fn trim_expired_non_written_txs(&self, limit_time: i64) -> KvResult<()> {
self.dbs_pool
......
......@@ -16,9 +16,10 @@ duniter-conf = { path = "../../duniter-conf" }
duniter-dbs = { path = "../../duniter-dbs" }
duniter-dbs-read-ops = { path = "../../duniter-dbs-read-ops" }
duniter-gva-dbs-reader = { path = "./dbs-reader" }
duniter-gva-db-writer = { path = "./db-writer" }
duniter-mempools = { path = "../../duniter-mempools" }
duniter-module = { path = "../../duniter-module" }
fast-threadpool = "0.2.1"
fast-threadpool = "0.2.2"
flume = "0.9.1"
futures = "0.3.6"
http = "0.2.1"
......
[package]
name = "duniter-gva-db-writer"
version = "0.1.0"
authors = ["elois <elois@duniter.org>"]
description = "Duniter GVA DB writer"
repository = "https://git.duniter.org/nodes/typescript/duniter"
keywords = ["dubp", "duniter", "blockchain", "database"]
license = "AGPL-3.0"
edition = "2018"
[lib]
path = "src/lib.rs"
[dependencies]
anyhow = "1.0.34"
duniter-dbs = { path = "../../../duniter-dbs" }
dubp = { version = "0.32.2" }
resiter = "0.4.0"
[dev-dependencies]
smallvec = { version = "1.4.0", features = ["serde", "write"] }
......@@ -13,12 +13,40 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
#![deny(
clippy::unwrap_used,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unstable_features,
unused_import_braces
)]
mod identities;
mod tx;
mod utxos;
use crate::*;
use duniter_dbs::gva_v1::{BalancesEvent, GvaIdentitiesEvent};
use dubp::block::prelude::*;
use dubp::common::crypto::hashs::Hash;
use dubp::common::prelude::*;
use dubp::documents::{
prelude::*, transaction::TransactionDocumentTrait, transaction::TransactionDocumentV10,
};
use dubp::wallet::prelude::*;
use duniter_dbs::gva_v1::*;
use duniter_dbs::{
kv_typed::prelude::*, GvaV1Db, GvaV1DbReadable, GvaV1DbWritable, HashKeyV2, PubKeyKeyV2,
SourceAmountValV2, TxDbV2, WalletConditionsV2,
};
use resiter::filter::Filter;
use std::collections::HashMap;
pub struct UtxoV10<'s> {
pub id: UtxoIdV10,
pub amount: SourceAmount,
pub script: &'s WalletScriptV10,
pub written_block: BlockNumber,
}
pub fn apply_block<B: Backend>(block: &DubpBlockV10, gva_db: &GvaV1Db<B>) -> KvResult<()> {
let blockstamp = Blockstamp {
......
......@@ -312,6 +312,7 @@ mod tests {
documents::smallvec::smallvec as svec, documents::transaction::v10::*,
documents::transaction::UTXOConditions,
};
use duniter_dbs::BlockMetaV2;
#[test]
fn test_apply_tx() -> KvResult<()> {
......
......@@ -50,6 +50,7 @@ use crate::tests::create_dbs_reader;
use crate::tests::DbsReader;
use async_graphql::http::GraphQLPlaygroundConfig;
use async_graphql::validators::{IntGreaterThan, ListMinLength, StringMaxLength, StringMinLength};
use dubp::block::DubpBlockV10;
use dubp::common::crypto::keys::{ed25519::PublicKey, KeyPair as _, PublicKey as _};
use dubp::common::prelude::*;
use dubp::documents::prelude::*;
......@@ -63,11 +64,13 @@ use duniter_gva_dbs_reader::create_dbs_reader;
#[cfg(not(test))]
use duniter_gva_dbs_reader::DbsReader;
use duniter_mempools::{Mempools, TxsMempool};
use fast_threadpool::{JoinHandle, ThreadPoolDisconnected};
use futures::{StreamExt, TryStreamExt};
use resiter::map::Map;
use std::{
convert::{Infallible, TryFrom},
ops::Deref,
path::Path,
};
use warp::{http::Response as HttpResponse, Filter as _, Rejection, Stream};
......@@ -83,6 +86,51 @@ pub struct GvaModule {
#[async_trait::async_trait]
impl duniter_module::DuniterModule for GvaModule {
fn apply_block(
block: Arc<DubpBlockV10>,
conf: &duniter_conf::DuniterConf,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
_profile_path_opt: Option<&Path>,
) -> Result<Option<JoinHandle<KvResult<()>>>, ThreadPoolDisconnected> {
if conf.gva.is_some() {
Ok(Some(dbs_pool.launch(move |dbs| {
duniter_gva_db_writer::apply_block(&block, &dbs.gva_db)
})?))
} else {
Ok(None)
}
}
fn apply_chunk_of_blocks(
blocks: Arc<[DubpBlockV10]>,
conf: &duniter_conf::DuniterConf,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
_profile_path_opt: Option<&Path>,
) -> Result<Option<JoinHandle<KvResult<()>>>, ThreadPoolDisconnected> {
if conf.gva.is_some() {
Ok(Some(dbs_pool.launch(move |dbs| {
for block in blocks.deref() {
duniter_gva_db_writer::apply_block(&block, &dbs.gva_db)?;
}
Ok::<_, KvError>(())
})?))
} else {
Ok(None)
}
}
fn revert_block(
block: Arc<DubpBlockV10>,
conf: &duniter_conf::DuniterConf,
dbs_pool: &fast_threadpool::ThreadPoolSyncHandler<DuniterDbs<FileBackend>>,
_profile_path_opt: Option<&Path>,
) -> Result<Option<JoinHandle<KvResult<()>>>, ThreadPoolDisconnected> {
if conf.gva.is_some() {
Ok(Some(dbs_pool.launch(move |dbs| {
duniter_gva_db_writer::revert_block(&block, &dbs.gva_db)
})?))
} else {
Ok(None)
}
}
fn init(
conf: &duniter_conf::DuniterConf,
currency: &str,
......
......@@ -16,7 +16,7 @@ duniter-gva = { path = "../../modules/gva" }
duniter-mempools = { path = "../../duniter-mempools" }
duniter-module = { path = "../../duniter-module" }
duniter-server = { path = "../../duniter-server" }
fast-threadpool = "0.2.1"
fast-threadpool = "0.2.2"
flume = "0.9.1"
log = "0.4.11"
paste = "1.0.2"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment