diff --git a/src/main.rs b/src/main.rs index 3e4b1bd3a8d80439e6a8ae75b3c53420e8eaa497..9c84de858840ea63ef060223f2fa45eb1159bd4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,17 +4,15 @@ use log::{info, warn}; use std::{ net::{IpAddr, SocketAddr}, str::FromStr, - time::Duration, }; use structopt::StructOpt; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::UnboundedSender; use warp::Filter as _; -use xmpp::{ClientBuilder, ClientType, Event}; -use xmpp_parsers::{message::MessageType, BareJid, Jid}; +use xmpp_parsers::Jid; mod webhook; mod webserver; -mod xmpp_bot; +mod xmpp_; #[derive(StructOpt, Debug)] #[structopt(name = "Webhook to XMPP MUC bot")] @@ -45,22 +43,34 @@ async fn main() -> anyhow::Result<()> { let jid = opt.jid; let password = opt.password; - let muc = opt.muc; + let muc_jid = opt.muc; let ip = opt.ip; let port = opt.port; let gitlab_token = opt.gitlab_token; pretty_env_logger::init(); - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); + // Start webserver tokio::spawn( async move { webserver::start(SocketAddr::new(ip, port), gitlab_token, sender).await }, ); - xmpp_bot::run_xmpp_bot(&jid, &password, &muc, receiver) + // Connect to xmpp server + let mut xmpp_agent = xmpp_::XmppAgent::connect(&jid, &password, &[&muc_jid]) .await .map_err(|e| anyhow::anyhow!("XMPP ERROR: {}", e))?; + // Functional logic: convert each received webhook to text message and send it to xmpp room + while let Some(gitlab_web_hook) = receiver.recv().await { + if let Some(message) = webhook::format_webhook(&gitlab_web_hook) { + info!("message: {}", message); + xmpp_agent.send_message(&message, &muc_jid).await; + } else { + warn!("Unhandled webhook payload: {:?}", gitlab_web_hook); + } + } + Ok(()) } diff --git a/src/webserver.rs b/src/webserver.rs index c5329ec94c7ce2ff23714b2f7023f5eff00e464d..7f562f2808895e7bfee152ebc5f8997896d6fd8c 100644 --- a/src/webserver.rs +++ b/src/webserver.rs @@ -12,7 +12,7 @@ impl std::fmt::Debug for BadRequest { impl warp::reject::Reject for BadRequest {} -pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSender<String>) { +pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSender<WebHook>) { warp::serve(webhook_route(gitlab_token, sender)) .run(addr) .await; @@ -20,7 +20,7 @@ pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSend fn webhook_route( gitlab_token: String, - sender: UnboundedSender<String>, + sender: UnboundedSender<WebHook>, ) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone { warp::path::end() .and(warp::header::optional::<String>("content-type")) @@ -33,23 +33,17 @@ fn webhook_route( gitlab_token_opt: Option<String>, body: Bytes, expected_gitlab_token: String, - sender: UnboundedSender<String>| async move { + sender: UnboundedSender<WebHook>| async move { if content_type_opt.as_deref() == Some("application/json") { if gitlab_token_opt == Some(expected_gitlab_token) { if let Ok(body_str) = std::str::from_utf8(body.as_ref()) { if let Ok(gitlab_web_hook) = serde_json::from_str(body_str) { - if let Some(message) = webhook::format_webhook(&gitlab_web_hook) { - info!("message: {}", message); - if sender.send(message).is_ok() { - Ok(warp::http::Response::new(OK_STR)) - } else { - Err(warp::reject::custom(BadRequest(anyhow::Error::msg( - "Fail to send gitlab webhook", - )))) - } - } else { - warn!("Unhandled webhook payload: {:?}", gitlab_web_hook); + if sender.send(gitlab_web_hook).is_ok() { Ok(warp::http::Response::new(OK_STR)) + } else { + Err(warp::reject::custom(BadRequest(anyhow::Error::msg( + "Fail to send gitlab webhook", + )))) } } else { Err(warp::reject::custom(BadRequest(anyhow::Error::msg( diff --git a/src/xmpp_.rs b/src/xmpp_.rs new file mode 100644 index 0000000000000000000000000000000000000000..cf1580e9514b3889a24562655174564121d6d486 --- /dev/null +++ b/src/xmpp_.rs @@ -0,0 +1,63 @@ +use std::collections::HashMap; + +use crate::*; +use xmpp::{ClientBuilder, ClientType, Event}; +use xmpp_parsers::{message::MessageType, BareJid}; + +pub(super) struct XmppAgent { + inner: xmpp::Agent, + rooms: HashMap<String, Jid>, +} + +impl XmppAgent { + pub async fn connect(jid: &str, password: &str, rooms: &[&str]) -> Result<Self, xmpp::Error> { + let mut agent = ClientBuilder::new(jid, password) + .set_client(ClientType::Bot, "gitbot") + .set_website("https://git.duniter.org/tools/gitbot") + .set_default_nick("gitbot") + .build()?; + + let mut rooms_map = HashMap::with_capacity(rooms.len()); + let mut rooms_joined = 0; + 'outer: while let Some(events) = agent.wait_for_events().await { + for event in events { + if let Event::Online = event { + info!("XMPP client now online at {}", jid); + for room in rooms { + let room_jid = BareJid::from_str(room)?; + rooms_map.insert((*room).to_owned(), Jid::Bare(room_jid.clone())); + agent + .join_room(room_jid, None, None, "en", "Your friendly hook bot.") + .await; + } + } else if let Event::RoomJoined(jid) = event { + info!("Entered MUC {}", jid); + agent + .send_message( + Jid::Bare(jid), + MessageType::Groupchat, + "en", + "Gitbot started", + ) + .await; + rooms_joined += 1; + if rooms_joined == rooms.len() { + break 'outer; + } + } + } + } + + Ok(Self { + inner: agent, + rooms: rooms_map, + }) + } + pub async fn send_message(&mut self, message: &str, room: &str) { + if let Some(room_jid) = self.rooms.get(room) { + self.inner + .send_message(room_jid.to_owned(), MessageType::Groupchat, "en", message) + .await + } + } +} diff --git a/src/xmpp_bot.rs b/src/xmpp_bot.rs deleted file mode 100644 index 254d705153d107fc62b488ab72fd02ba7941c65f..0000000000000000000000000000000000000000 --- a/src/xmpp_bot.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::*; - -const MIN_DURATION_BEETWEEN_XMPP_MSGS: Duration = Duration::from_millis(500); - -pub async fn run_xmpp_bot<'a>( - jid: &'a str, - password: &'a str, - muc_jid: &'a str, - mut receiver: UnboundedReceiver<String>, -) -> Result<(), xmpp::Error> { - let muc_jid: BareJid = match BareJid::from_str(muc_jid) { - Ok(jid) => jid, - Err(err) => panic!("MUC Jid invalid: {:?}", err), - }; - - let mut agent = ClientBuilder::new(jid, password) - .set_client(ClientType::Bot, "gitbot") - .set_website("https://git.duniter.org/tools/gitbot") - .set_default_nick("gitbot") - .build()?; - - // Join room - while let Some(events) = agent.wait_for_events().await { - for event in events { - match event { - Event::Online => { - info!("XMPP client now online at {}", jid); - agent - .join_room(muc_jid.clone(), None, None, "en", "Your friendly hook bot.") - .await; - } - Event::Disconnected => { - info!("XMPP client disconnected"); - } - Event::RoomJoined(jid) => { - info!("Entered MUC {}", jid); - agent - .send_message( - Jid::Bare(muc_jid.to_owned()), - MessageType::Groupchat, - "en", - "Gitbot started", - ) - .await; - while let Some(message) = receiver.recv().await { - agent - .send_message( - Jid::Bare(muc_jid.to_owned()), - MessageType::Groupchat, - "en", - &message, - ) - .await; - tokio::time::sleep(MIN_DURATION_BEETWEEN_XMPP_MSGS).await; - } - } - _ => (), - } - } - } - - Ok(()) -}