Skip to content
Snippets Groups Projects
Select Git revision
  • 9f1450f89dea00d9dac7c7cc43b17cd625faed70
  • master default protected
  • network/gtest-1000 protected
  • upgradable-multisig
  • runtime/gtest-1000
  • network/gdev-800 protected
  • cgeek/issue-297-cpu
  • gdev-800-tests
  • update-docker-compose-rpc-squid-names
  • fix-252
  • 1000i100-test
  • hugo/tmp-0.9.1
  • network/gdev-803 protected
  • hugo/endpoint-gossip
  • network/gdev-802 protected
  • hugo/distance-precompute
  • network/gdev-900 protected
  • tuxmain/anonymous-tx
  • debug/podman
  • hugo/195-doc
  • hugo/195-graphql-schema
  • gtest-1000-0.8.0 protected
  • gtest-1000 protected
  • gdev-900-0.10.1 protected
  • gdev-900-0.10.0 protected
  • gdev-900-0.9.2 protected
  • gdev-800-0.8.0 protected
  • gdev-900-0.9.1 protected
  • gdev-900-0.9.0 protected
  • gdev-803 protected
  • gdev-802 protected
  • runtime-801 protected
  • gdev-800 protected
  • runtime-800-bis protected
  • runtime-800 protected
  • runtime-800-backup protected
  • runtime-701 protected
  • runtime-700 protected
  • runtime-600 protected
  • runtime-500 protected
  • v0.4.1 protected
41 results

build-for-arm.md

Blame
  • test_ws2p.py 2.85 KiB
    import unittest
    import aiohttp
    import jsonschema
    import json
    from duniterpy.documents import BMAEndpoint
    from tests.api.webserver import WebFunctionalSetupMixin, web, asyncio
    from duniterpy.api.bma.network import heads, WS2P_HEADS_SCHEMA
    from duniterpy.api.bma import parse_text
    
    
    class Test_WS2P_Heads(WebFunctionalSetupMixin, unittest.TestCase):
    
        def test_block(self):
            json_sample = {
      "heads": [
        {
          "message": "WS2POCAIC:HEAD:1:8iVdpXqFLCxGyPqgVx5YbFSkmWKkceXveRd2yvBKeARL:102102-000002C0694C7D373A78B095419C86584B81804CFB9641B7EBC3A18040B6FEE6:e66254bf:duniter:1.6.20:1",
          "sig": "ZO5gSUMK6IaUEwU4K40nhuHOfnJ6Zfn8VS+4Ko2FM7t+mDsHf+3gDRT9PgV2p0fz81mF6jVYWpq2UYEsnK/gCg==",
          "messageV2": "WS2POCAIC:HEAD:2:8iVdpXqFLCxGyPqgVx5YbFSkmWKkceXveRd2yvBKeARL:102102-000002C0694C7D373A78B095419C86584B81804CFB9641B7EBC3A18040B6FEE6:e66254bf:duniter:1.6.20:1:15:14",
          "sigV2": "ReXzbgUya9jo4dL/R4g19Y+RE9BGB0xDkw7mrBWoldlRLkq3KFyRkAf9VthVx1UUb/AINr3nxImZKVQiVH9+DQ==",
          "step": 0
        },
        {
          "message": "WS2POCAIC:HEAD:1:2ny7YAdmzReQxAayyJZsyVYwYhVyax2thKcGknmQy5nQ:102102-000002C0694C7D373A78B095419C86584B81804CFB9641B7EBC3A18040B6FEE6:a0a45ed2:duniter:1.6.21:1",
          "sig": "pXLMmOpyEMdWihT183g/rnCvMzA2gHki5Cxg7rEl3psQu0RuK0ObCv5YFhmQnRlg+QZ1CWfbYEEbm3G1eGplAQ==",
          "messageV2": "WS2POCAIC:HEAD:2:2ny7YAdmzReQxAayyJZsyVYwYhVyax2thKcGknmQy5nQ:102102-000002C0694C7D373A78B095419C86584B81804CFB9641B7EBC3A18040B6FEE6:a0a45ed2:duniter:1.6.21:1:34:28",
          "sigV2": "p5f7/KfQqjTaCYSMUXpjUDH7uF2DafetHNgphGzkOXgxM+Upeii0Fz2RFBwnZvN+Gjp81hAqSuH48PJP6HJSAw==",
          "step": 1
        },
        {
          "message": "WS2POCA:HEAD:1:GRBPV3Y7PQnB9LaZhSGuS3BqBJbSHyibzYq65kTh1nQ4:102102-000002C0694C7D373A78B095419C86584B81804CFB9641B7EBC3A18040B6FEE6:6d0e96f9:duniter:1.6.21:1",
          "sig": "h9o1XBEV18gUzbvj1jdQB1M7U8ifZprIyVwLdlSQEfeG9WZLvZAjYzLGA2nD6h/9RkJLOJPzIQJXysHUHJ2dDQ==",
          "messageV2": "WS2POCA:HEAD:2:GRBPV3Y7PQnB9LaZhSGuS3BqBJbSHyibzYq65kTh1nQ4:102102-000002C0694C7D373A78B095419C86584B81804CFB9641B7EBC3A18040B6FEE6:6d0e96f9:duniter:1.6.21:1:20:20",
          "sigV2": "VsyQmXOUYrfHWy0FeS4rJrIJCUBI+3BergbSYQ78icJWV6MQzZSw7Z+Yl7urujCYZriDQM76D6GW+6F0EELpBQ==",
          "step": 2
        },
      ]
    }
    
            jsonschema.validate(json_sample, WS2P_HEADS_SCHEMA)
    
        def test_bma_ws2p_heads_bad(self):
            async def handler(request):
                await request.read()
                return web.Response(body=b'{}', content_type='application/json')
    
            async def go():
                _, port, url = await self.create_server('GET', '/network/ws2p/heads', handler)
                with self.assertRaises(jsonschema.ValidationError):
                    async with aiohttp.ClientSession() as session:
                        connection = next(BMAEndpoint("127.0.0.1", None, None, port).conn_handler(session))
                        await heads(connection)
    
            self.loop.run_until_complete(go())