Skip to content
Snippets Groups Projects
Commit 7b5db30c authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1038 Refuse to treat simultaneously the same document

parent a80377eb
Branches
Tags
No related merge requests found
Showing
with 246 additions and 75 deletions
......@@ -120,6 +120,7 @@ export const CommonConstants = {
ERRORS: {
// Technical errors
WRONG_DOCUMENT: { httpCode: 400, uerr: { ucode: 1005, message: "Document has unkown fields or wrong line ending format" }},
DOCUMENT_BEING_TREATED: { httpCode: 400, uerr: { ucode: 1015, message: "Document already under treatment" }},
// Business errors
WRONG_UNLOCKER: { httpCode: 400, uerr: { ucode: 2013, message: "Wrong unlocker in transaction" }},
......
import {IdentityDTO} from "./IdentityDTO"
import {Buid} from "../common-libs/buid"
import {Cloneable} from "./Cloneable";
import {hashf} from "../common";
const DEFAULT_DOCUMENT_VERSION = 10
......@@ -108,4 +109,8 @@ export class CertificationDTO extends ShortCertificationDTO implements Cloneable
obj.idty_sig
)
}
getHash() {
return hashf(this.getRawSigned())
}
}
\ No newline at end of file
......@@ -108,4 +108,8 @@ export class IdentityDTO {
})
)
}
getHash() {
return hashf(this.getRawSigned())
}
}
\ No newline at end of file
import {IdentityDTO} from "./IdentityDTO"
import * as moment from "moment"
import {Cloneable} from "./Cloneable";
import {hashf} from "../common";
const DEFAULT_DOCUMENT_VERSION = 10
......@@ -128,4 +129,8 @@ export class MembershipDTO implements Cloneable {
obj.signature
)
}
getHash() {
return hashf(this.getRawSigned())
}
}
import {Cloneable} from "./Cloneable";
import {hashf} from "../common";
const DEFAULT_DOCUMENT_VERSION = 10
export interface ShortRevocation {
......@@ -61,4 +62,8 @@ export class RevocationDTO implements ShortRevocation, Cloneable {
json.revocation || json.revocation
)
}
getHash() {
return hashf(this.getRaw())
}
}
\ No newline at end of file
"use strict";
import {GlobalFifoPromise} from "./GlobalFifoPromise"
import {BlockchainContext} from "../lib/computation/BlockchainContext"
import {ConfDTO} from "../lib/dto/ConfDTO"
import {FileDAL} from "../lib/dal/fileDAL"
import {QuickSynchronizer} from "../lib/computation/QuickSync"
import {BlockDTO} from "../lib/dto/BlockDTO"
import {DBIdentity} from "../lib/dal/sqliteDAL/IdentityDAL"
import {DBBlock} from "../lib/db/DBBlock"
import {GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules"
import {parsers} from "../lib/common-libs/parsers/index"
import {GlobalFifoPromise} from "./GlobalFifoPromise";
import {BlockchainContext} from "../lib/computation/BlockchainContext";
import {ConfDTO} from "../lib/dto/ConfDTO";
import {FileDAL} from "../lib/dal/fileDAL";
import {QuickSynchronizer} from "../lib/computation/QuickSync";
import {BlockDTO} from "../lib/dto/BlockDTO";
import {DBIdentity} from "../lib/dal/sqliteDAL/IdentityDAL";
import {DBBlock} from "../lib/db/DBBlock";
import {GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules";
import {parsers} from "../lib/common-libs/parsers/index";
import {HttpIdentityRequirement} from "../modules/bma/lib/dtos";
import {FIFOService} from "./FIFOService";
const _ = require('underscore');
const constants = require('../lib/constants');
const CHECK_ALL_RULES = true;
export class BlockchainService {
export class BlockchainService extends FIFOService {
mainContext:BlockchainContext
conf:ConfDTO
......@@ -25,7 +26,8 @@ export class BlockchainService {
selfPubkey:string
quickSynchronizer:QuickSynchronizer
constructor(private server:any) {
constructor(private server:any, fifoPromiseHandler:GlobalFifoPromise) {
super(fifoPromiseHandler)
this.mainContext = new BlockchainContext()
}
......@@ -115,7 +117,9 @@ export class BlockchainService {
}
submitBlock(obj:any, doCheck:boolean, forkAllowed:boolean): Promise<BlockDTO> {
return GlobalFifoPromise.pushFIFO(() => {
const dto = BlockDTO.fromJSONObject(obj)
const hash = dto.getHash()
return this.pushFIFO(hash, () => {
return this.checkAndAddBlock(obj, doCheck, forkAllowed)
})
}
......@@ -237,12 +241,12 @@ export class BlockchainService {
}
revertCurrentBlock() {
return GlobalFifoPromise.pushFIFO(() => this.mainContext.revertCurrentBlock())
return this.pushFIFO("revertCurrentBlock", () => this.mainContext.revertCurrentBlock())
}
applyNextAvailableFork() {
return GlobalFifoPromise.pushFIFO(() => this.mainContext.applyNextAvailableFork())
return this.pushFIFO("applyNextAvailableFork", () => this.mainContext.applyNextAvailableFork())
}
......
import {GlobalFifoPromise} from "./GlobalFifoPromise";
export abstract class FIFOService {
constructor(private fifoPromiseHandler:GlobalFifoPromise) {}
async pushFIFO<T>(operationId: string, p: () => Promise<T>): Promise<T> {
return this.fifoPromiseHandler.pushFIFOPromise(operationId, p)
}
}
\ No newline at end of file
"use strict";
import {CommonConstants} from "../lib/common-libs/constants";
const async = require('async');
const fifo = async.queue(function (task:any, callback:any) {
export class GlobalFifoPromise {
private fifo:any = async.queue(function (task:any, callback:any) {
task(callback);
}, 1);
}, 1)
export class GlobalFifoPromise {
private operations:{ [k:string]: boolean } = {}
static getLen() {
return fifo.length()
constructor() {
}
/**
* Adds a promise to a FIFO stack of promises, so the given promise will be executed against a shared FIFO stack.
* @param operationId The ID of the operation, which indicates which task to reject if the FIFO already contains it
* @param p
*/
static pushFIFO<T>(p: () => Promise<T>): Promise<T> {
pushFIFOPromise<T>(operationId: string, p: () => Promise<T>): Promise<T> {
// Return a promise that will be done after the fifo has executed the given promise
return new Promise((resolve:any, reject:any) => {
if (this.operations[operationId]) {
throw CommonConstants.ERRORS.DOCUMENT_BEING_TREATED
}
this.operations[operationId] = true
// Push the promise on the stack
fifo.push(async (cb:any) => {
this.fifo.push(async (cb:any) => {
// OK its the turn of given promise, execute it
try {
const res = await p();
delete this.operations[operationId]
// Finished, we end the function in the FIFO
cb(null, res);
} catch (e) {
delete this.operations[operationId]
// Errored, we end the function with an error
cb(e);
}
......
import {GlobalFifoPromise} from "./GlobalFifoPromise"
import {FileDAL} from "../lib/dal/fileDAL"
import {ConfDTO} from "../lib/dto/ConfDTO"
import {DBIdentity} from "../lib/dal/sqliteDAL/IdentityDAL"
import {GLOBAL_RULES_FUNCTIONS, GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules"
import {BlockDTO} from "../lib/dto/BlockDTO"
import {RevocationDTO} from "../lib/dto/RevocationDTO"
import {BasicIdentity, IdentityDTO} from "../lib/dto/IdentityDTO"
import {CertificationDTO} from "../lib/dto/CertificationDTO"
import {DBCert} from "../lib/dal/sqliteDAL/CertDAL"
import {verify} from "../lib/common-libs/crypto/keyring"
import {GlobalFifoPromise} from "./GlobalFifoPromise";
import {FileDAL} from "../lib/dal/fileDAL";
import {ConfDTO} from "../lib/dto/ConfDTO";
import {DBIdentity} from "../lib/dal/sqliteDAL/IdentityDAL";
import {GLOBAL_RULES_FUNCTIONS, GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules";
import {BlockDTO} from "../lib/dto/BlockDTO";
import {RevocationDTO} from "../lib/dto/RevocationDTO";
import {BasicIdentity, IdentityDTO} from "../lib/dto/IdentityDTO";
import {CertificationDTO} from "../lib/dto/CertificationDTO";
import {DBCert} from "../lib/dal/sqliteDAL/CertDAL";
import {verify} from "../lib/common-libs/crypto/keyring";
import {FIFOService} from "./FIFOService";
"use strict";
const constants = require('../lib/constants');
const BY_ABSORPTION = true;
export class IdentityService {
export class IdentityService extends FIFOService {
dal:FileDAL
conf:ConfDTO
logger:any
constructor() {}
constructor(fifoPromiseHandler:GlobalFifoPromise) {
super(fifoPromiseHandler)
}
setConfDAL(newConf:ConfDTO, newDAL:FileDAL) {
this.dal = newDAL;
......@@ -77,7 +80,8 @@ export class IdentityService {
// Force usage of local currency name, do not accept other currencies documents
idtyObj.currency = this.conf.currency;
const createIdentity = idtyObj.rawWithoutSig();
return GlobalFifoPromise.pushFIFO<DBIdentity>(async () => {
const hash = idtyObj.getHash()
return this.pushFIFO<DBIdentity>(hash, async () => {
this.logger.info('⬇ IDTY %s %s', idty.pubkey, idty.uid);
// Check signature's validity
let verified = verify(createIdentity, idty.sig, idty.pubkey);
......@@ -144,7 +148,8 @@ export class IdentityService {
}, BY_ABSORPTION);
}
let anErr:any
return GlobalFifoPromise.pushFIFO<CertificationDTO>(async () => {
const hash = cert.getHash()
return this.pushFIFO<CertificationDTO>(hash, async () => {
this.logger.info('⬇ CERT %s block#%s -> %s', cert.from, cert.block_number, idty.uid);
try {
await GLOBAL_RULES_HELPERS.checkCertificationIsValid(cert, potentialNext, () => Promise.resolve(idty), this.conf, this.dal);
......@@ -207,7 +212,8 @@ export class IdentityService {
obj.currency = this.conf.currency || obj.currency;
const revoc = RevocationDTO.fromJSONObject(obj)
const raw = revoc.rawWithoutSig();
return GlobalFifoPromise.pushFIFO<RevocationDTO>(async () => {
const hash = revoc.getHash()
return this.pushFIFO<RevocationDTO>(hash, async () => {
try {
this.logger.info('⬇ REVOCATION %s %s', revoc.pubkey, revoc.idty_uid);
let verified = verify(raw, revoc.revocation, revoc.pubkey);
......
"use strict";
import {GlobalFifoPromise} from "./GlobalFifoPromise"
import {ConfDTO} from "../lib/dto/ConfDTO"
import {FileDAL} from "../lib/dal/fileDAL"
import {LOCAL_RULES_HELPERS} from "../lib/rules/local_rules"
import {GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules"
import {MembershipDTO} from "../lib/dto/MembershipDTO"
import {GlobalFifoPromise} from "./GlobalFifoPromise";
import {ConfDTO} from "../lib/dto/ConfDTO";
import {FileDAL} from "../lib/dal/fileDAL";
import {LOCAL_RULES_HELPERS} from "../lib/rules/local_rules";
import {GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules";
import {MembershipDTO} from "../lib/dto/MembershipDTO";
import {FIFOService} from "./FIFOService";
const constants = require('../lib/constants');
export class MembershipService {
export class MembershipService extends FIFOService {
constructor(fifoPromiseHandler:GlobalFifoPromise) {
super(fifoPromiseHandler)
}
conf:ConfDTO
dal:FileDAL
......@@ -25,8 +30,9 @@ export class MembershipService {
}
submitMembership(ms:any) {
return GlobalFifoPromise.pushFIFO<MembershipDTO>(async () => {
const entry = MembershipDTO.fromJSONObject(ms)
const hash = entry.getHash()
return this.pushFIFO<MembershipDTO>(hash, async () => {
// Force usage of local currency name, do not accept other currencies documents
entry.currency = this.conf.currency || entry.currency;
this.logger.info('⬇ %s %s', entry.issuer, entry.membership);
......
import {GlobalFifoPromise} from "./GlobalFifoPromise"
import {ConfDTO} from "../lib/dto/ConfDTO"
import {FileDAL} from "../lib/dal/fileDAL"
import {DBPeer} from "../lib/dal/sqliteDAL/PeerDAL"
import {DBBlock} from "../lib/db/DBBlock"
import {Multicaster} from "../lib/streams/multicaster"
import {PeerDTO} from "../lib/dto/PeerDTO"
import {verify} from "../lib/common-libs/crypto/keyring"
import {dos2unix} from "../lib/common-libs/dos2unix"
import {ConfDTO} from "../lib/dto/ConfDTO";
import {FileDAL} from "../lib/dal/fileDAL";
import {DBPeer} from "../lib/dal/sqliteDAL/PeerDAL";
import {DBBlock} from "../lib/db/DBBlock";
import {Multicaster} from "../lib/streams/multicaster";
import {PeerDTO} from "../lib/dto/PeerDTO";
import {verify} from "../lib/common-libs/crypto/keyring";
import {dos2unix} from "../lib/common-libs/dos2unix";
import {rawer} from "../lib/common-libs/index";
import {Server} from "../../server";
import {GlobalFifoPromise} from "./GlobalFifoPromise";
const util = require('util');
const _ = require('underscore');
......@@ -22,6 +22,8 @@ export interface Keyring {
secretKey:string
}
// Note: for an unknown reason, PeeringService cannot extend FIFOService correctly. When this.pushFIFO() is called
// from within submitp(), "this.pushFIFO === undefined" is true.
export class PeeringService {
conf:ConfDTO
......@@ -32,7 +34,7 @@ export class PeeringService {
peerInstance:DBPeer | null
logger:any
constructor(private server:Server) {
constructor(private server:Server, private fifoPromiseHandler:GlobalFifoPromise) {
}
setConfDAL(newConf:ConfDTO, newDAL:FileDAL, newPair:Keyring) {
......@@ -80,7 +82,8 @@ export class PeeringService {
let sigTime = 0;
let block:DBBlock | null;
let makeCheckings = cautious || cautious === undefined;
return GlobalFifoPromise.pushFIFO<PeerDTO>(async () => {
const hash = thePeerDTO.getHash()
return this.fifoPromiseHandler.pushFIFOPromise<PeerDTO>(hash, async () => {
try {
if (makeCheckings) {
let goodSignature = this.checkPeerSignature(thePeerDTO)
......
"use strict";
import {GlobalFifoPromise} from "./GlobalFifoPromise"
import {ConfDTO} from "../lib/dto/ConfDTO"
import {FileDAL} from "../lib/dal/fileDAL"
import {TransactionDTO} from "../lib/dto/TransactionDTO"
import {LOCAL_RULES_HELPERS} from "../lib/rules/local_rules"
import {GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules"
import {DBTx} from "../lib/dal/sqliteDAL/TxsDAL"
import {ConfDTO} from "../lib/dto/ConfDTO";
import {FileDAL} from "../lib/dal/fileDAL";
import {TransactionDTO} from "../lib/dto/TransactionDTO";
import {LOCAL_RULES_HELPERS} from "../lib/rules/local_rules";
import {GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules";
import {DBTx} from "../lib/dal/sqliteDAL/TxsDAL";
import {FIFOService} from "./FIFOService";
import {GlobalFifoPromise} from "./GlobalFifoPromise";
const constants = require('../lib/constants');
const CHECK_PENDING_TRANSACTIONS = true
export class TransactionService {
export class TransactionService extends FIFOService {
constructor(fifoPromiseHandler:GlobalFifoPromise) {
super(fifoPromiseHandler)
}
conf:ConfDTO
dal:FileDAL
......@@ -23,8 +28,9 @@ export class TransactionService {
}
processTx(txObj:any) {
return GlobalFifoPromise.pushFIFO<TransactionDTO>(async () => {
const tx = TransactionDTO.fromJSONObject(txObj, this.conf.currency)
const hash = tx.getHash()
return this.pushFIFO<TransactionDTO>(hash, async () => {
try {
this.logger.info('⬇ TX %s:%s from %s', tx.output_amount, tx.output_base, tx.issuers);
const existing = await this.dal.getTxByHash(tx.hash);
......
......@@ -13,6 +13,7 @@ import {parsers} from "./app/lib/common-libs/parsers/index";
import {Cloneable} from "./app/lib/dto/Cloneable";
import {DuniterDocument, duniterDocument2str} from "./app/lib/common-libs/constants";
import {CrawlerConstants} from "./app/modules/crawler/lib/constants";
import {GlobalFifoPromise} from "./app/service/GlobalFifoPromise";
interface HookableServer {
getMainEndpoint: (...args:any[]) => Promise<any>
......@@ -65,12 +66,14 @@ export class Server extends stream.Duplex implements HookableServer {
this.paramsP = directory.getHomeParams(memoryOnly, home)
const documentFIFO = new GlobalFifoPromise()
this.MerkleService = require("./app/lib/helpers/merkle").processForURL
this.IdentityService = new IdentityService()
this.MembershipService = new MembershipService()
this.PeeringService = new PeeringService(this)
this.BlockchainService = new BlockchainService(this)
this.TransactionsService = new TransactionService()
this.IdentityService = new IdentityService(documentFIFO)
this.MembershipService = new MembershipService(documentFIFO)
this.PeeringService = new PeeringService(this, documentFIFO)
this.BlockchainService = new BlockchainService(this, documentFIFO)
this.TransactionsService = new TransactionService(documentFIFO)
}
// Unused, but made mandatory by Duplex interface
......
......@@ -114,8 +114,9 @@ describe("Continous proof-of-work", function() {
require('../../app/modules/crawler').CrawlerDependency.duniter.methods.pullBlocks(s3),
s3.startBlockComputation()
];
yield s3.expectJSON('/blockchain/current', { number: 15 });
const current = yield s3.get('/blockchain/current')
yield s3.stopBlockComputation();
current.number.should.be.aboveOrEqual(14)
}));
after(() => {
......
"use strict";
const _ = require('underscore');
const co = require('co');
const assert = require('assert');
const user = require('./tools/user');
const commit = require('./tools/commit');
const toolbox = require('./tools/toolbox');
const CommonConstants = require('../../app/lib/common-libs/constants').CommonConstants
const now = 1500000000
let s1, s2, cat, tac
describe("Single document treatment", function() {
before(() => co(function*() {
s1 = toolbox.server({
// The common conf
medianTimeBlocks: 1,
avgGenTime: 11,
udTime0: now,
udReevalTime0: now,
pair: {
pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd',
sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'
}
});
s2 = toolbox.server({
pair: {
pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc',
sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'
}
});
cat = user('cat', { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'}, { server: s1 });
tac = user('tac', { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'}, { server: s1 });
yield s1.prepareForNetwork();
yield s2.prepareForNetwork();
// Publishing identities
yield cat.createIdentity();
yield tac.createIdentity();
yield cat.cert(tac);
yield tac.cert(cat);
yield cat.join();
yield tac.join();
}));
after(() => {
return Promise.all([
s1.closeCluster(),
s2.closeCluster()
])
})
it('should create a common blockchain', () => co(function*() {
const b0 = yield s1.commit({ time: now })
const b1 = yield s1.commit({ time: now + 11 })
const b2 = yield s1.commit({ time: now + 22 })
yield s2.writeBlock(b0)
yield s2.writeBlock(b1)
yield s2.writeBlock(b2)
}))
it('should exist the same block on each node', () => co(function*() {
yield s1.expectJSON('/blockchain/current', {
number: 2
})
yield s2.expectJSON('/blockchain/current', {
number: 2
})
}))
it('should refuse known fork blocks', () => co(function*() {
const p1 = yield s1.getPeer()
// Trigger the multiple writings in parallel
const res = yield Promise.all([
s2.writePeer(p1).then(p => p).catch(e => { assert.equal(e.uerr.message, "Document already under treatment"); return null }),
s2.writePeer(p1).then(p => p).catch(e => { assert.equal(e.uerr.message, "Document already under treatment"); return null }),
s2.writePeer(p1).then(p => p).catch(e => { assert.equal(e.uerr.message, "Document already under treatment"); return null }),
s2.writePeer(p1).then(p => p).catch(e => { assert.equal(e.uerr.message, "Document already under treatment"); return null }),
s2.writePeer(p1).then(p => p).catch(e => { assert.equal(e.uerr.message, "Document already under treatment"); return null }),
s2.writePeer(p1).then(p => p).catch(e => { assert.equal(e.uerr.message, "Document already under treatment"); return null })
])
assert.notEqual(res[0], null)
assert.equal(res[1], null)
assert.equal(res[2], null)
assert.equal(res[3], null)
assert.equal(res[4], null)
assert.equal(res[5], null)
}))
})
......@@ -401,6 +401,10 @@ export class TestingServer {
});
}
async getPeer() {
return this.get('/network/peering')
}
postIdentity(idty:any) {
return this.post('/wot/add', {
identity: idty.getRawSigned()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment