Skip to content
Snippets Groups Projects
Commit 07a259b7 authored by Éloïs's avatar Éloïs
Browse files

ref(all): Decoupling webserver and xmpp parts & extract functional logic

parent a47cb326
No related branches found
No related tags found
No related merge requests found
Pipeline #12780 passed
......@@ -4,17 +4,15 @@ use log::{info, warn};
use std::{
net::{IpAddr, SocketAddr},
str::FromStr,
time::Duration,
};
use structopt::StructOpt;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::UnboundedSender;
use warp::Filter as _;
use xmpp::{ClientBuilder, ClientType, Event};
use xmpp_parsers::{message::MessageType, BareJid, Jid};
use xmpp_parsers::Jid;
mod webhook;
mod webserver;
mod xmpp_bot;
mod xmpp_;
#[derive(StructOpt, Debug)]
#[structopt(name = "Webhook to XMPP MUC bot")]
......@@ -45,22 +43,34 @@ async fn main() -> anyhow::Result<()> {
let jid = opt.jid;
let password = opt.password;
let muc = opt.muc;
let muc_jid = opt.muc;
let ip = opt.ip;
let port = opt.port;
let gitlab_token = opt.gitlab_token;
pretty_env_logger::init();
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
// Start webserver
tokio::spawn(
async move { webserver::start(SocketAddr::new(ip, port), gitlab_token, sender).await },
);
xmpp_bot::run_xmpp_bot(&jid, &password, &muc, receiver)
// Connect to xmpp server
let mut xmpp_agent = xmpp_::XmppAgent::connect(&jid, &password, &[&muc_jid])
.await
.map_err(|e| anyhow::anyhow!("XMPP ERROR: {}", e))?;
// Functional logic: convert each received webhook to text message and send it to xmpp room
while let Some(gitlab_web_hook) = receiver.recv().await {
if let Some(message) = webhook::format_webhook(&gitlab_web_hook) {
info!("message: {}", message);
xmpp_agent.send_message(&message, &muc_jid).await;
} else {
warn!("Unhandled webhook payload: {:?}", gitlab_web_hook);
}
}
Ok(())
}
......@@ -12,7 +12,7 @@ impl std::fmt::Debug for BadRequest {
impl warp::reject::Reject for BadRequest {}
pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSender<String>) {
pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSender<WebHook>) {
warp::serve(webhook_route(gitlab_token, sender))
.run(addr)
.await;
......@@ -20,7 +20,7 @@ pub async fn start(addr: SocketAddr, gitlab_token: String, sender: UnboundedSend
fn webhook_route(
gitlab_token: String,
sender: UnboundedSender<String>,
sender: UnboundedSender<WebHook>,
) -> impl warp::Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path::end()
.and(warp::header::optional::<String>("content-type"))
......@@ -33,24 +33,18 @@ fn webhook_route(
gitlab_token_opt: Option<String>,
body: Bytes,
expected_gitlab_token: String,
sender: UnboundedSender<String>| async move {
sender: UnboundedSender<WebHook>| async move {
if content_type_opt.as_deref() == Some("application/json") {
if gitlab_token_opt == Some(expected_gitlab_token) {
if let Ok(body_str) = std::str::from_utf8(body.as_ref()) {
if let Ok(gitlab_web_hook) = serde_json::from_str(body_str) {
if let Some(message) = webhook::format_webhook(&gitlab_web_hook) {
info!("message: {}", message);
if sender.send(message).is_ok() {
if sender.send(gitlab_web_hook).is_ok() {
Ok(warp::http::Response::new(OK_STR))
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
"Fail to send gitlab webhook",
))))
}
} else {
warn!("Unhandled webhook payload: {:?}", gitlab_web_hook);
Ok(warp::http::Response::new(OK_STR))
}
} else {
Err(warp::reject::custom(BadRequest(anyhow::Error::msg(
"Fail to deserialize gitlab webhook",
......
use std::collections::HashMap;
use crate::*;
use xmpp::{ClientBuilder, ClientType, Event};
use xmpp_parsers::{message::MessageType, BareJid};
pub(super) struct XmppAgent {
inner: xmpp::Agent,
rooms: HashMap<String, Jid>,
}
impl XmppAgent {
pub async fn connect(jid: &str, password: &str, rooms: &[&str]) -> Result<Self, xmpp::Error> {
let mut agent = ClientBuilder::new(jid, password)
.set_client(ClientType::Bot, "gitbot")
.set_website("https://git.duniter.org/tools/gitbot")
.set_default_nick("gitbot")
.build()?;
let mut rooms_map = HashMap::with_capacity(rooms.len());
let mut rooms_joined = 0;
'outer: while let Some(events) = agent.wait_for_events().await {
for event in events {
if let Event::Online = event {
info!("XMPP client now online at {}", jid);
for room in rooms {
let room_jid = BareJid::from_str(room)?;
rooms_map.insert((*room).to_owned(), Jid::Bare(room_jid.clone()));
agent
.join_room(room_jid, None, None, "en", "Your friendly hook bot.")
.await;
}
} else if let Event::RoomJoined(jid) = event {
info!("Entered MUC {}", jid);
agent
.send_message(
Jid::Bare(jid),
MessageType::Groupchat,
"en",
"Gitbot started",
)
.await;
rooms_joined += 1;
if rooms_joined == rooms.len() {
break 'outer;
}
}
}
}
Ok(Self {
inner: agent,
rooms: rooms_map,
})
}
pub async fn send_message(&mut self, message: &str, room: &str) {
if let Some(room_jid) = self.rooms.get(room) {
self.inner
.send_message(room_jid.to_owned(), MessageType::Groupchat, "en", message)
.await
}
}
}
use crate::*;
const MIN_DURATION_BEETWEEN_XMPP_MSGS: Duration = Duration::from_millis(500);
pub async fn run_xmpp_bot<'a>(
jid: &'a str,
password: &'a str,
muc_jid: &'a str,
mut receiver: UnboundedReceiver<String>,
) -> Result<(), xmpp::Error> {
let muc_jid: BareJid = match BareJid::from_str(muc_jid) {
Ok(jid) => jid,
Err(err) => panic!("MUC Jid invalid: {:?}", err),
};
let mut agent = ClientBuilder::new(jid, password)
.set_client(ClientType::Bot, "gitbot")
.set_website("https://git.duniter.org/tools/gitbot")
.set_default_nick("gitbot")
.build()?;
// Join room
while let Some(events) = agent.wait_for_events().await {
for event in events {
match event {
Event::Online => {
info!("XMPP client now online at {}", jid);
agent
.join_room(muc_jid.clone(), None, None, "en", "Your friendly hook bot.")
.await;
}
Event::Disconnected => {
info!("XMPP client disconnected");
}
Event::RoomJoined(jid) => {
info!("Entered MUC {}", jid);
agent
.send_message(
Jid::Bare(muc_jid.to_owned()),
MessageType::Groupchat,
"en",
"Gitbot started",
)
.await;
while let Some(message) = receiver.recv().await {
agent
.send_message(
Jid::Bare(muc_jid.to_owned()),
MessageType::Groupchat,
"en",
&message,
)
.await;
tokio::time::sleep(MIN_DURATION_BEETWEEN_XMPP_MSGS).await;
}
}
_ => (),
}
}
}
Ok(())
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment