From 07a259b72c182c838b601ce479c8e20e07af993d Mon Sep 17 00:00:00 2001
From: librelois <c@elo.tf>
Date: Sat, 12 Jun 2021 02:51:34 +0200
Subject: [PATCH] ref(all): Decoupling webserver and xmpp parts & extract
functional logic
---
src/main.rs | 26 ++++++++++++++------
src/webserver.rs | 22 ++++++-----------
src/xmpp_.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++++++
src/xmpp_bot.rs | 63 ------------------------------------------------
4 files changed, 89 insertions(+), 85 deletions(-)
create mode 100644 src/xmpp_.rs
delete mode 100644 src/xmpp_bot.rs
diff --git a/src/main.rs b/src/main.rs
index 3e4b1bd..9c84de8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,17 +4,15 @@ use log::{info, warn};
use std::{
net::{IpAddr, SocketAddr},
str::FromStr,
- time::Duration,
};
use structopt::StructOpt;
-use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+use tokio::sync::mpsc::UnboundedSender;
use warp::Filter as _;
-use xmpp::{ClientBuilder, ClientType, Event};
-use xmpp_parsers::{message::MessageType, BareJid, Jid};
+use xmpp_parsers::Jid;
mod webhook;
mod webserver;
-mod xmpp_bot;
+mod xmpp_;
#[derive(StructOpt, Debug)]
#[structopt(name = "Webhook to XMPP MUC bot")]
@@ -45,22 +43,34 @@ async fn main() -> anyhow::Result<()> {
let jid = opt.jid;
let password = opt.password;
- let muc = opt.muc;
+ let muc_jid = opt.muc;
let ip = opt.ip;
let port = opt.port;
let gitlab_token = opt.gitlab_token;
pretty_env_logger::init();
- let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
+ let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
+ // Start webserver
tokio::spawn(
async move { webserver::start(SocketAddr::new(ip, port), gitlab_token, sender).await },
);
- xmpp_bot::run_xmpp_bot(&jid, &password, &muc, receiver)
+ // Connect to xmpp server
+ let mut xmpp_agent = xmpp_::XmppAgent::connect(&jid, &password, &[&muc_jid])
.await
.map_err(|e| anyhow::anyhow!("XMPP ERROR: {}", e))?;
+ // Functional logic: convert each received webhook to text message and send it to xmpp room
+ while let Some(gitlab_web_hook) = receiver.recv().await {
+ if let Some(message) = webhook::format_webhook(&gitlab_web_hook) {
+ info!("message: {}", message);
+ xmpp_agent.send_message(&message, &muc_jid).await;
+ } else {
+ warn!("Unhandled webhook payload: {:?}", gitlab_web_hook);
+ }
+ }
+
Ok(())
}
diff --git a/src/webserver.rs b/src/webserver.rs
index c5329ec..7f562f2 100644
--- a/src/webserver.rs
+++ b/src/webserver.rs
@@ -12,7 +12,7 @@ impl std::fmt::Debug for BadRequest {
impl warp::reject::Reject for BadRequest {}
-pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSender<String>) {
+pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSender<WebHook>) {
warp::serve(webhook_route(gitlab_token, sender))
.run(addr)
.await;
@@ -20,7 +20,7 @@ pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSend
fn webhook_route(
gitlab_token: String,
- sender: UnboundedSender<String>,
+ sender: UnboundedSender<WebHook>,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path::end()
.and(warp::header::optional::<String>("content-type"))
@@ -33,23 +33,17 @@ fn webhook_route(
gitlab_token_opt: Option<String>,
body: Bytes,
expected_gitlab_token: String,
- sender: UnboundedSender<String>| async move {
+ sender: UnboundedSender<WebHook>| async move {
if content_type_opt.as_deref() == Some("application/json") {
if gitlab_token_opt == Some(expected_gitlab_token) {
if let Ok(body_str) = std::str::from_utf8(body.as_ref()) {
if let Ok(gitlab_web_hook) = serde_json::from_str(body_str) {
- if let Some(message) = webhook::format_webhook(&gitlab_web_hook) {
- info!("message: {}", message);
- if sender.send(message).is_ok() {
- Ok(warp::http::Response::new(OK_STR))
- } else {
- Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
- "Fail to send gitlab webhook",
- ))))
- }
- } else {
- warn!("Unhandled webhook payload: {:?}", gitlab_web_hook);
+ if sender.send(gitlab_web_hook).is_ok() {
Ok(warp::http::Response::new(OK_STR))
+ } else {
+ Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
+ "Fail to send gitlab webhook",
+ ))))
}
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
diff --git a/src/xmpp_.rs b/src/xmpp_.rs
new file mode 100644
index 0000000..cf1580e
--- /dev/null
+++ b/src/xmpp_.rs
@@ -0,0 +1,63 @@
+use std::collections::HashMap;
+
+use crate::*;
+use xmpp::{ClientBuilder, ClientType, Event};
+use xmpp_parsers::{message::MessageType, BareJid};
+
+pub(super) struct XmppAgent {
+ inner: xmpp::Agent,
+ rooms: HashMap<String, Jid>,
+}
+
+impl XmppAgent {
+ pub async fn connect(jid: &str, password: &str, rooms: &[&str]) -> Result<Self, xmpp::Error> {
+ let mut agent = ClientBuilder::new(jid, password)
+ .set_client(ClientType::Bot, "gitbot")
+ .set_website("https://git.duniter.org/tools/gitbot")
+ .set_default_nick("gitbot")
+ .build()?;
+
+ let mut rooms_map = HashMap::with_capacity(rooms.len());
+ let mut rooms_joined = 0;
+ 'outer: while let Some(events) = agent.wait_for_events().await {
+ for event in events {
+ if let Event::Online = event {
+ info!("XMPP client now online at {}", jid);
+ for room in rooms {
+ let room_jid = BareJid::from_str(room)?;
+ rooms_map.insert((*room).to_owned(), Jid::Bare(room_jid.clone()));
+ agent
+ .join_room(room_jid, None, None, "en", "Your friendly hook bot.")
+ .await;
+ }
+ } else if let Event::RoomJoined(jid) = event {
+ info!("Entered MUC {}", jid);
+ agent
+ .send_message(
+ Jid::Bare(jid),
+ MessageType::Groupchat,
+ "en",
+ "Gitbot started",
+ )
+ .await;
+ rooms_joined += 1;
+ if rooms_joined == rooms.len() {
+ break 'outer;
+ }
+ }
+ }
+ }
+
+ Ok(Self {
+ inner: agent,
+ rooms: rooms_map,
+ })
+ }
+ pub async fn send_message(&mut self, message: &str, room: &str) {
+ if let Some(room_jid) = self.rooms.get(room) {
+ self.inner
+ .send_message(room_jid.to_owned(), MessageType::Groupchat, "en", message)
+ .await
+ }
+ }
+}
diff --git a/src/xmpp_bot.rs b/src/xmpp_bot.rs
deleted file mode 100644
index 254d705..0000000
--- a/src/xmpp_bot.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use crate::*;
-
-const MIN_DURATION_BEETWEEN_XMPP_MSGS: Duration = Duration::from_millis(500);
-
-pub async fn run_xmpp_bot<'a>(
- jid: &'a str,
- password: &'a str,
- muc_jid: &'a str,
- mut receiver: UnboundedReceiver<String>,
-) -> Result<(), xmpp::Error> {
- let muc_jid: BareJid = match BareJid::from_str(muc_jid) {
- Ok(jid) => jid,
- Err(err) => panic!("MUC Jid invalid: {:?}", err),
- };
-
- let mut agent = ClientBuilder::new(jid, password)
- .set_client(ClientType::Bot, "gitbot")
- .set_website("https://git.duniter.org/tools/gitbot")
- .set_default_nick("gitbot")
- .build()?;
-
- // Join room
- while let Some(events) = agent.wait_for_events().await {
- for event in events {
- match event {
- Event::Online => {
- info!("XMPP client now online at {}", jid);
- agent
- .join_room(muc_jid.clone(), None, None, "en", "Your friendly hook bot.")
- .await;
- }
- Event::Disconnected => {
- info!("XMPP client disconnected");
- }
- Event::RoomJoined(jid) => {
- info!("Entered MUC {}", jid);
- agent
- .send_message(
- Jid::Bare(muc_jid.to_owned()),
- MessageType::Groupchat,
- "en",
- "Gitbot started",
- )
- .await;
- while let Some(message) = receiver.recv().await {
- agent
- .send_message(
- Jid::Bare(muc_jid.to_owned()),
- MessageType::Groupchat,
- "en",
- &message,
- )
- .await;
- tokio::time::sleep(MIN_DURATION_BEETWEEN_XMPP_MSGS).await;
- }
- }
- _ => (),
- }
- }
- }
-
- Ok(())
-}
--
GitLab