From f6fa7be517882c7f7be9103c61c56526152d8ca1 Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Sun, 12 May 2019 14:52:53 +0200 Subject: [PATCH] [enh] refactor GraphQL to separate files --- back/webmin/graphql/test/hello.graphql | 3 - back/webmin/graphql/test/sycnhronize.graphql | 3 - back/webmin/graphql/test/ws2pInfos.graphql | 22 -- back/webmin/queries/gql-current.ts | 5 + back/webmin/queries/gql-heads.ts | 10 + back/webmin/queries/gql-node-state.ts | 48 ++++ back/webmin/queries/gql-synchronize.ts | 49 ++++ back/webmin/queries/gql-ws2p.ts | 59 ++++ back/webmin/{graphql => }/schema.graphqls | 0 back/webmin/shared/gql-network.ts | 5 + .../subscriptions/gql-events-blockchain.ts | 16 ++ .../subscriptions/gql-events-documents.ts | 73 +++++ .../subscriptions/gql-events-sync-progress.ts | 15 + back/webmin/webmin.ts | 256 ++---------------- common/dto.ts | 4 +- 15 files changed, 307 insertions(+), 261 deletions(-) delete mode 100644 back/webmin/graphql/test/hello.graphql delete mode 100644 back/webmin/graphql/test/sycnhronize.graphql delete mode 100644 back/webmin/graphql/test/ws2pInfos.graphql create mode 100644 back/webmin/queries/gql-current.ts create mode 100644 back/webmin/queries/gql-heads.ts create mode 100644 back/webmin/queries/gql-node-state.ts create mode 100644 back/webmin/queries/gql-synchronize.ts create mode 100644 back/webmin/queries/gql-ws2p.ts rename back/webmin/{graphql => }/schema.graphqls (100%) create mode 100644 back/webmin/shared/gql-network.ts create mode 100644 back/webmin/subscriptions/gql-events-blockchain.ts create mode 100644 back/webmin/subscriptions/gql-events-documents.ts create mode 100644 back/webmin/subscriptions/gql-events-sync-progress.ts diff --git a/back/webmin/graphql/test/hello.graphql b/back/webmin/graphql/test/hello.graphql deleted file mode 100644 index 2f9e86e..0000000 --- a/back/webmin/graphql/test/hello.graphql +++ /dev/null @@ -1,3 +0,0 @@ -query { - hello -} \ No newline at end of file diff --git a/back/webmin/graphql/test/sycnhronize.graphql b/back/webmin/graphql/test/sycnhronize.graphql deleted file mode 100644 index 46b1b8d..0000000 --- a/back/webmin/graphql/test/sycnhronize.graphql +++ /dev/null @@ -1,3 +0,0 @@ -query { - synchronize(url: "g1.duniter.org:443") -} \ No newline at end of file diff --git a/back/webmin/graphql/test/ws2pInfos.graphql b/back/webmin/graphql/test/ws2pInfos.graphql deleted file mode 100644 index 625b59a..0000000 --- a/back/webmin/graphql/test/ws2pInfos.graphql +++ /dev/null @@ -1,22 +0,0 @@ - -query { - ws2pinfos { - softVersions { - pubkeys - software - version - } - level1 { - pubkey - handle - uid - ws2pid - } - level2 { - pubkey - handle - uid - ws2pid - } - } -} diff --git a/back/webmin/queries/gql-current.ts b/back/webmin/queries/gql-current.ts new file mode 100644 index 0000000..1c66170 --- /dev/null +++ b/back/webmin/queries/gql-current.ts @@ -0,0 +1,5 @@ +import {Server} from 'duniter/server' + +export function gqlCurrent(server: Server) { + return () => server.dal.getCurrentBlockOrNull() +} \ No newline at end of file diff --git a/back/webmin/queries/gql-heads.ts b/back/webmin/queries/gql-heads.ts new file mode 100644 index 0000000..fb2bb50 --- /dev/null +++ b/back/webmin/queries/gql-heads.ts @@ -0,0 +1,10 @@ +import {Server} from 'duniter/server' + +export function gqlHeads(server: Server) { + return () => { + if (server.ws2pCluster) { + return server.ws2pCluster.getKnownHeads() + } + return [] + } +} diff --git a/back/webmin/queries/gql-node-state.ts b/back/webmin/queries/gql-node-state.ts new file mode 100644 index 0000000..caa024f --- /dev/null +++ b/back/webmin/queries/gql-node-state.ts @@ -0,0 +1,48 @@ +import {Server} from 'duniter/server' +import {Querable} from 'duniter/app/lib/common-libs/querable' +import {getSyncPromise} from './gql-synchronize' + +let started = false + +export function gqlNodeStart(startServices: () => Promise<void>) { + return async () => { + await startServices() + return started = true + } +} + +export function gqlStopAndResetData(server: Server, stopServices: () => Promise<void>) { + return async () => { + await stopServices() + started = false + + // Reset all the node's data + await server.dal.close() + await server.resetData() + await server.dal.init(server.conf) + + return true + } +} + +export function gqlNodeState(server: Server) { + return async () => { + const syncPromise = getSyncPromise() // Binding to sync GQL module + const current = await server.dal.getCurrentBlockOrNull() + if (syncPromise.isFulfilled() && current && !started) { + return 'READY_FOR_START' + } + else if (syncPromise.isFulfilled() && current) { + return 'STARTED' + } + else if (!current) { + return 'READY_FOR_SYNC' + } + else if (!syncPromise.isFulfilled()) { + return 'SYNCHRONIZING' + } + else { + return 'UNKNOWN' + } + } +} diff --git a/back/webmin/queries/gql-synchronize.ts b/back/webmin/queries/gql-synchronize.ts new file mode 100644 index 0000000..f952c2f --- /dev/null +++ b/back/webmin/queries/gql-synchronize.ts @@ -0,0 +1,49 @@ +import {Server} from 'duniter/server' +import {Querable, querablep} from 'duniter/app/lib/common-libs/querable' +import {CrawlerDependency} from 'duniter/app/modules/crawler' +import {SyncEnding, SyncProgress} from '../../../common/dto' +import {gqlPushSyncProgress} from '../subscriptions/gql-events-sync-progress' + +let syncPromise: Querable<void> = querablep(Promise.resolve()) + +export function getSyncPromise() { + return syncPromise +} + +export function gqlIsSyncStarted() { + return () => !syncPromise.isFulfilled() +} + +export function gqlSynchronize(server: Server) { + return async (_: any, { url }: { url: string }) => { + // Wait for (eventual) last sync to end + try { + await syncPromise + } catch (e) { + console.error(e) + } + + // Reset all the node's data + await server.dal.close() + await server.resetData() + await server.dal.init(server.conf) + + // Begin sync + const host = url.split(':')[0] + const port = url.split(':')[1] + const sync = CrawlerDependency.duniter.methods.synchronize(server, host, parseInt(port), undefined as any, 250) + + // Publish the flow of synchronization for the UI + sync.flow.on('data', async (syncProgress: SyncEnding|SyncProgress) => { + if (syncProgress.sync !== undefined) { + const ending = syncProgress as SyncEnding + await gqlPushSyncProgress({ + error: ending.msg && ending.msg.message + }) + } + await gqlPushSyncProgress(syncProgress) + }) + + syncPromise = querablep(sync.syncPromise) + } +} diff --git a/back/webmin/queries/gql-ws2p.ts b/back/webmin/queries/gql-ws2p.ts new file mode 100644 index 0000000..7463d12 --- /dev/null +++ b/back/webmin/queries/gql-ws2p.ts @@ -0,0 +1,59 @@ +import {Server} from 'duniter/server' +import {WS2PConnection} from 'duniter/app/modules/ws2p/lib/WS2PConnection' +import {softVersions} from '../shared/gql-network' + +export function gqlWs2pInfos(server: Server) { + return async () => { + if (server.ws2pCluster) { + let level1 = await server.ws2pCluster.getLevel1Connections() + let level2 = await server.ws2pCluster.getLevel2Connections() + const theSoftVersions: { + software: string + version: string + pubkeys: string[] + }[] = [] + Object.keys(softVersions).map((soft: any) => { + Object.keys(softVersions[soft]).map((version: any) => { + theSoftVersions.push({ + software: soft, + version, + pubkeys: softVersions[soft][version] + }) + }) + }) + return { + softVersions: theSoftVersions, + level1: await level1.map(c => ws2pConnectionToJSON(server, c)), + level2: await level2.map(c => ws2pConnectionToJSON(server, c)) + } + } else { + return { + softVersions: [], + level1: [], + level2: [] + } + } + } +} + +async function ws2pConnectionToJSON(server: Server, connection: WS2PConnection|any) { + const pubkey = connection.pubkey + const ws2pid = connection.uuid + const member = await server.dal.getWrittenIdtyByPubkey(pubkey) + if (connection.ws._socket.server) { + return { + pubkey: connection.pubkey, + ws2pid: connection.uuid, + uid: member ? member.uid : '', + handle: connection.ws._socket.server._connectionKey.split(':').slice(1).join(':') + } + } + else { + return { + pubkey: connection.pubkey, + ws2pid: connection.uuid, + uid: member ? member.uid : '', + handle: [connection.ws._socket.remoteAddress, connection.ws._socket.remotePort].join(':') + } + } +} diff --git a/back/webmin/graphql/schema.graphqls b/back/webmin/schema.graphqls similarity index 100% rename from back/webmin/graphql/schema.graphqls rename to back/webmin/schema.graphqls diff --git a/back/webmin/shared/gql-network.ts b/back/webmin/shared/gql-network.ts new file mode 100644 index 0000000..8e7a6d7 --- /dev/null +++ b/back/webmin/shared/gql-network.ts @@ -0,0 +1,5 @@ +export const softVersions: { + [software: string]: { + [version: string]: string[] + } +} = {} diff --git a/back/webmin/subscriptions/gql-events-blockchain.ts b/back/webmin/subscriptions/gql-events-blockchain.ts new file mode 100644 index 0000000..9c8635f --- /dev/null +++ b/back/webmin/subscriptions/gql-events-blockchain.ts @@ -0,0 +1,16 @@ +import {Server} from 'duniter/server' +import {GraphQLSubscriptions} from '../constants' +import {pubsub} from '../webmin' + +export function gqlSubscribeBlockchainEvents(server: Server) { + + server.on('data', async (data: any) => { + if (data.bcEvent) { + return pubsub.publish(GraphQLSubscriptions.BC_EVENTS, {bcEvents: data}) + } + }) + + return { + subscribe: () => pubsub.asyncIterator(GraphQLSubscriptions.BC_EVENTS) + } +} \ No newline at end of file diff --git a/back/webmin/subscriptions/gql-events-documents.ts b/back/webmin/subscriptions/gql-events-documents.ts new file mode 100644 index 0000000..bd67881 --- /dev/null +++ b/back/webmin/subscriptions/gql-events-documents.ts @@ -0,0 +1,73 @@ +import {Server} from 'duniter/server' +import {softVersions} from '../shared/gql-network' +import {GraphQLSubscriptions} from '../constants' +import {pubsub} from '../webmin' + +export function gqlSubscribeDocuments(server: Server) { + + server.on('data', async (data: any) => { + + const newDocuments = { + blocks: [] as any[], + identities: [] as any[], + certifications: [] as any[], + memberships: [] as any[], + transactions: [] as any[], + peers: [] as any[], + ws2pHeads: [] as any[], + ws2pConnections: [] as any[], + ws2pDisconnections: [] as any[], + } + if (data.joiners) { + newDocuments.blocks.push(data) + } + else if (data.ws2p === 'heads') { + data.added.forEach((h: any) => { + // Trace all the versions of all WS2P heads + const [conf, type, version, pub, blockstamp, uuid, soft, softV, room1, room2] = h.message.split(':') + const softObj = softVersions[soft] || (softVersions[soft] = {}) + if (!softVersions[soft]) { + softVersions[soft] = {} + } + const softVer = softObj[softV] || (softObj[softV] = []) + if (!softVer.includes(pub)) { + softVer.push(pub) + } + newDocuments.ws2pHeads.push(h) + }) + } + else if (data.ws2p === 'connected') { + newDocuments.ws2pConnections.push({ + host: data.to.host, + port: data.to.port, + pub: data.to.pubkey, + }) + } + else if (data.ws2p === 'disconnected') { + newDocuments.ws2pDisconnections.push({ + pub: data.peer.pub, + }) + } + else if (data.endpoints) { + newDocuments.peers.push(data) + } + else if (data.inputs) { + newDocuments.transactions.push(data) + } + else if (data.membership) { + newDocuments.memberships.push(data) + } + else if (data.from) { + newDocuments.certifications.push(data) + } + else if (data.uid) { + newDocuments.identities.push(data) + } + + await pubsub.publish(GraphQLSubscriptions.NEW_DOCUMENTS, { newDocuments }) + }) + + return { + subscribe: () => pubsub.asyncIterator(GraphQLSubscriptions.NEW_DOCUMENTS) + } +} diff --git a/back/webmin/subscriptions/gql-events-sync-progress.ts b/back/webmin/subscriptions/gql-events-sync-progress.ts new file mode 100644 index 0000000..b79c325 --- /dev/null +++ b/back/webmin/subscriptions/gql-events-sync-progress.ts @@ -0,0 +1,15 @@ +import {GraphQLSubscriptions} from '../constants' +import {SyncProgress} from '../../../common/dto' +import {pubsub} from '../webmin' + +export function gqlPushSyncProgress(progress: SyncProgress) { + return pubsub.publish(GraphQLSubscriptions.SYNC_PROGRESS, { + syncProgress: progress + }) +} + +export function gqlSubscribeSyncProgress() { + return { + subscribe: () => pubsub.asyncIterator(GraphQLSubscriptions.SYNC_PROGRESS) + } +} diff --git a/back/webmin/webmin.ts b/back/webmin/webmin.ts index 795a2ec..4e9a5a7 100644 --- a/back/webmin/webmin.ts +++ b/back/webmin/webmin.ts @@ -2,253 +2,47 @@ import * as path from 'path'; import * as fs from 'fs'; import {makeExecutableSchema} from 'graphql-tools'; import {Server} from 'duniter/server'; -import {CrawlerDependency} from 'duniter/app/modules/crawler'; import {PubSub} from 'graphql-subscriptions'; -import {GraphQLSubscriptions} from './constants' -import {Querable, querablep} from 'duniter/app/lib/common-libs/querable' -import {NodeState, SoftVersions} from '../../common/types' -import {SyncProgress, SyncEnding} from '../../common/dto' -import {WS2PConnection} from 'duniter/app/modules/ws2p/lib/WS2PConnection' import {gqlUid} from './queries/gql-uid' +import {gqlIsSyncStarted, gqlSynchronize} from './queries/gql-synchronize' +import {gqlNodeStart, gqlNodeState, gqlStopAndResetData} from './queries/gql-node-state' +import {gqlWs2pInfos} from './queries/gql-ws2p' +import {gqlSubscribeSyncProgress} from './subscriptions/gql-events-sync-progress' +import {gqlSubscribeBlockchainEvents} from './subscriptions/gql-events-blockchain' +import {gqlSubscribeDocuments} from './subscriptions/gql-events-documents' +import {gqlHeads} from './queries/gql-heads' +import {gqlCurrent} from './queries/gql-current' export const pubsub = new PubSub(); -let syncPromise: Querable<void> = querablep(Promise.resolve()) - export function plugModule(server: Server, startServices: () => Promise<void>, stopServices: () => Promise<void>) { - const schemaFile = path.join(__dirname, 'graphql', 'schema.graphqls'); - const typeDefs = fs.readFileSync(schemaFile, 'utf8'); - - const softVersions: { - [software: string]: { - [version: string]: string[] - } - } = {} - - async function ws2pConnectionToJSON(connection:WS2PConnection|any) { - const pubkey = connection.pubkey - const ws2pid = connection.uuid - const member = await server.dal.getWrittenIdtyByPubkey(pubkey) - if (connection.ws._socket.server) { - return { - pubkey: connection.pubkey, - ws2pid: connection.uuid, - uid: member ? member.uid : '', - handle: connection.ws._socket.server._connectionKey.split(':').slice(1).join(':') - } - } - else { - return { - pubkey: connection.pubkey, - ws2pid: connection.uuid, - uid: member ? member.uid : '', - handle: [connection.ws._socket.remoteAddress, connection.ws._socket.remotePort].join(':') - } - } - } - - server.on('data', async (data: any) => { - if (data.bcEvent) { - return pubsub.publish(GraphQLSubscriptions.BC_EVENTS, { bcEvents: data }) - } - - const newDocuments = { - blocks: [] as any[], - identities: [] as any[], - certifications: [] as any[], - memberships: [] as any[], - transactions: [] as any[], - peers: [] as any[], - ws2pHeads: [] as any[], - ws2pConnections: [] as any[], - ws2pDisconnections: [] as any[], - } - if (data.joiners) { - newDocuments.blocks.push(data) - } - else if (data.ws2p === 'heads') { - data.added.forEach((h: any) => { - // Trace all the versions of all WS2P heads - const [conf, type, version, pub, blockstamp, uuid, soft, softV, room1, room2] = h.message.split(':') - const softObj = softVersions[soft] || (softVersions[soft] = {}) - if (!softVersions[soft]) { - softVersions[soft] = {} - } - const softVer = softObj[softV] || (softObj[softV] = []) - if (!softVer.includes(pub)) { - softVer.push(pub) - } - newDocuments.ws2pHeads.push(h) - }) - } - else if (data.ws2p === 'connected') { - newDocuments.ws2pConnections.push({ - host: data.to.host, - port: data.to.port, - pub: data.to.pubkey, - }) - } - else if (data.ws2p === 'disconnected') { - newDocuments.ws2pDisconnections.push({ - pub: data.peer.pub, - }) - } - else if (data.endpoints) { - newDocuments.peers.push(data) - } - else if (data.inputs) { - newDocuments.transactions.push(data) - } - else if (data.membership) { - newDocuments.memberships.push(data) - } - else if (data.from) { - newDocuments.certifications.push(data) - } - else if (data.uid) { - newDocuments.identities.push(data) - } - await pubsub.publish(GraphQLSubscriptions.NEW_DOCUMENTS, { newDocuments }) - }) + return makeExecutableSchema({ - let started = false + // Read the GraphQL schema definition + typeDefs: fs.readFileSync(path.join(__dirname, 'schema.graphqls'), 'utf8'), - return makeExecutableSchema({ - typeDefs, resolvers: { Query: { - hello: () => 'Welcome to Duniter WEBMIN API.', - - isSyncStarted: () => !syncPromise.isFulfilled(), - - current: () => server.dal.getCurrentBlockOrNull(), - - heads: () => { - if (server.ws2pCluster) { - return server.ws2pCluster.getKnownHeads() - } - return [] - }, - - ws2pinfos: async () => { - if (server.ws2pCluster) { - let level1 = await server.ws2pCluster.getLevel1Connections() - let level2 = await server.ws2pCluster.getLevel2Connections() - const theSoftVersions: { - software: string - version: string - pubkeys: string[] - }[] = [] - Object.keys(softVersions).map((soft: any) => { - Object.keys(softVersions[soft]).map((version: any) => { - theSoftVersions.push({ - software: soft, - version, - pubkeys: softVersions[soft][version] - }) - }) - }) - return { - softVersions: theSoftVersions, - level1: await level1.map(ws2pConnectionToJSON), - level2: await level2.map(ws2pConnectionToJSON) - } - } else { - return { - softVersions: [], - level1: [], - level2: [] - } - } - }, - - nodeState: async (): Promise<NodeState> => { - const current = await server.dal.getCurrentBlockOrNull() - if (syncPromise.isFulfilled() && current && !started) { - return 'READY_FOR_START' - } - else if (syncPromise.isFulfilled() && current) { - return 'STARTED' - } - else if (!current) { - return 'READY_FOR_SYNC' - } - else if (!syncPromise.isFulfilled()) { - return 'SYNCHRONIZING' - } - else { - return 'UNKNOWN' - } - }, - - stopAndResetData: async (): Promise<boolean> => { - await stopServices() - started = false - - // Reset all the node's data - await server.dal.close() - await server.resetData() - await server.dal.init(server.conf) - - return true - }, - - startNode: async (): Promise<boolean> => { - await startServices() - return started = true - }, - - synchronize: async (_: any, { url }: { url: string }) => { - // Wait for (eventual) last sync to end - try { - await syncPromise - } catch (e) { - console.error(e) - } - - // Reset all the node's data - await server.dal.close() - await server.resetData() - await server.dal.init(server.conf) - - // Begin sync - const host = url.split(':')[0] - const port = url.split(':')[1] - const sync = CrawlerDependency.duniter.methods.synchronize(server, host, parseInt(port), undefined as any, 250) - - // Publish the flow of synchronization for the UI - sync.flow.on('data', async (syncProgress: SyncEnding|SyncProgress) => { - if (syncProgress.sync !== undefined) { - const ending = syncProgress as SyncEnding - await pubsub.publish(GraphQLSubscriptions.SYNC_PROGRESS, { - syncProgress: { - error: ending.msg && ending.msg.message - } - }) - } - await pubsub.publish(GraphQLSubscriptions.SYNC_PROGRESS, { syncProgress }) - }) - - syncPromise = querablep(sync.syncPromise) - }, - - uid: gqlUid(server) + hello: () => 'Welcome to Duniter WEBMIN API.', + isSyncStarted: gqlIsSyncStarted(), + current: gqlCurrent(server), + heads: gqlHeads(server), + ws2pinfos: gqlWs2pInfos(server), + nodeState: gqlNodeState(server), + stopAndResetData: gqlStopAndResetData(server, stopServices), + startNode: gqlNodeStart(startServices), + synchronize: gqlSynchronize(server), + uid: gqlUid(server) }, Subscription: { - syncProgress: { - subscribe: () => pubsub.asyncIterator(GraphQLSubscriptions.SYNC_PROGRESS) - }, - newDocuments: { - subscribe: () => pubsub.asyncIterator(GraphQLSubscriptions.NEW_DOCUMENTS) - }, - bcEvents: { - subscribe: () => pubsub.asyncIterator(GraphQLSubscriptions.BC_EVENTS) - } + syncProgress: gqlSubscribeSyncProgress(), + newDocuments: gqlSubscribeDocuments(server), + bcEvents: gqlSubscribeBlockchainEvents(server) }, } }) -} \ No newline at end of file +} diff --git a/common/dto.ts b/common/dto.ts index 8ac5b5d..c2248a5 100644 --- a/common/dto.ts +++ b/common/dto.ts @@ -5,8 +5,8 @@ export interface SyncProgress { sandbox?: number peersSync?: number sync?: boolean - error?: Error - p2pData: any + error?: string + p2pData?: any } export interface SyncEnding { -- GitLab