Commit Diff


commit - 159646d3ac75937fe0b83a2f97f52ce9418510eb
commit + 6116f744308210e8f722a63b08a5d714ec7b7c40
blob - ea8c4bf7f35f6f77f75d92ad8ce8349f6e81ddba
blob + ed7cf036e55b15528829f63580e1131885dabf1d
--- .gitignore
+++ .gitignore
@@ -1 +1,2 @@
 /target
+.mail_sync
blob - c01caf510419879c5bc6f7cbec1102e9df4669f0
blob + ce03734d18a04a393b07ddb9f8f2fa8f46efcdff
--- Cargo.lock
+++ Cargo.lock
@@ -1273,6 +1273,15 @@ source = "registry+https://github.com/rust-lang/crates
 checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
 
 [[package]]
+name = "lock_api"
+version = "0.4.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
+dependencies = [
+ "scopeguard",
+]
+
+[[package]]
 name = "log"
 version = "0.4.28"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1378,6 +1387,29 @@ source = "registry+https://github.com/rust-lang/crates
 checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"
 
 [[package]]
+name = "parking_lot"
+version = "0.12.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
+dependencies = [
+ "lock_api",
+ "parking_lot_core",
+]
+
+[[package]]
+name = "parking_lot_core"
+version = "0.9.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "redox_syscall",
+ "smallvec",
+ "windows-link",
+]
+
+[[package]]
 name = "percent-encoding"
 version = "2.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1445,6 +1477,15 @@ source = "registry+https://github.com/rust-lang/crates
 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
 
 [[package]]
+name = "redox_syscall"
+version = "0.5.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
+dependencies = [
+ "bitflags",
+]
+
+[[package]]
 name = "regex"
 version = "1.12.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1620,6 +1661,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "scopeguard"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
+
+[[package]]
 name = "sct"
 version = "0.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1866,6 +1913,7 @@ dependencies = [
  "bytes",
  "libc",
  "mio",
+ "parking_lot",
  "pin-project-lite",
  "signal-hook-registry",
  "socket2 0.6.1",
blob - 9ad96f231a7da19d7f9ad5121831913f19159994
blob + 2221b8157522e67a09fad9f0c67101f15f5aff01
--- Cargo.toml
+++ Cargo.toml
@@ -14,7 +14,7 @@ anyhow = "1.0.100"
 aws-config = { version = "1.8.10", features = ["behavior-version-latest"] }
 aws-sdk-cognitoidentityprovider = "1.103.0"
 clap = { version = "4.5.52", features = ["derive", "env"] }
-tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "fs", "io-std"] }
+tokio = { version = "1.48.0", features = ["full"] }
 tracing = "0.1.41"
 tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
 
blob - 896804d1a7e2f3f51bae5bad0af3dfab9a297776
blob + aaac2ae2d7af614e1b350ea0fe82a5934b1aaba6
--- src/main.rs
+++ src/main.rs
@@ -21,22 +21,25 @@
 // accept timeout
 // accept poolid
 
-
 mod helper;
 
+use std::collections::HashMap;
+use std::path::Path;
 use std::path::PathBuf;
+use std::sync::Arc;
 use std::time::Duration;
 
 use anyhow::{Context, Result};
-use clap::{ArgAction, Parser, Subcommand};
-use tracing::{debug, info};
-use tracing_subscriber::EnvFilter;
-use tokio::fs::File;
-use tokio::io::{self, AsyncWrite, AsyncWriteExt};
 use aws_sdk_cognitoidentityprovider::Client as CognitoClient;
 use aws_sdk_cognitoidentityprovider::types::UserType;
+use clap::{ArgAction, Parser, Subcommand};
+use tokio::fs::File;
+use tokio::io::{self, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
+use tokio::sync::Semaphore;
+use tokio::task::JoinHandle;
+use tracing::{debug, error, info};
+use tracing_subscriber::EnvFilter;
 
-
 const LONG_VERSION: &str = concat!(
     env!("CARGO_PKG_NAME"),
     " ",
@@ -81,12 +84,75 @@ enum Commands {
     Sync(SyncArgs),
 
     /// Add users to one or more Cognito groups.
-    Add(GroupOperationArgs),
+    Add(AddArgs),
 
     /// Remove users from one or more Cognito groups.
     Del(GroupOperationArgs),
 }
 
+/// Arguments for the `add` operation.
+///
+/// High-level flow:
+/// - `sync_file` is the CSV produced by `sync` (username,email).
+/// - `emails_file` is a plain text file with one e-mail per line,
+///   representing the users that should be added to the given groups.
+/// - `groups` is the list of Cognito group names.
+/// - `pool_id` is the Cognito User Pool ID.
+/// - `concurrency` defines how many users are processed in parallel.
+#[derive(Debug, Parser)]
+pub struct AddArgs {
+    /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX).
+    #[arg(long = "pool-id")]
+    pub pool_id: String,
+
+    /// CSV file generated by the `sync` command (username,email).
+    ///
+    /// This file is loaded into a HashMap<email, username> and used
+    /// as the source of truth for resolving usernames from e-mail.
+    #[arg(
+        long = "sync-file",
+        value_name = "CSV_PATH",
+        help = "CSV file produced by `sync` containing username,email columns"
+    )]
+    pub sync_file: PathBuf,
+
+    /// Text file containing one e-mail per line.
+    ///
+    /// Each e-mail will be normalized (trim + lowercase) and then
+    /// looked up in the sync CSV map to obtain the corresponding username.
+    #[arg(
+        long = "emails-file",
+        value_name = "TXT_PATH",
+        help = "Plain text file with one e-mail per line to be added to the groups"
+    )]
+    pub emails_file: PathBuf,
+
+    /// One or more Cognito group names.
+    ///
+    /// All resolved users will be added to every group listed here.
+    #[arg(
+        long = "group",
+        alias = "groups",
+        value_name = "GROUP",
+        num_args = 1..,
+        required = true
+    )]
+    pub groups: Vec<String>,
+
+    /// Maximum number of users to process concurrently.
+    ///
+    /// This controls how many users are handled in parallel when calling
+    /// the AdminAddUserToGroup API.
+    #[arg(long = "concurrency", value_name = "N", default_value_t = 4)]
+    pub concurrency: usize,
+
+    /// Global timeout for the operation, in seconds.
+    ///
+    /// When set, the entire add operation is bounded by this timeout.
+    #[arg(long = "timeout", value_name = "SECONDS")]
+    pub timeout: Option<u64>,
+}
+
 /// Common arguments shared by group-based operations.
 #[derive(clap::Args, Debug, Clone)]
 pub struct CommonOperationArgs {
@@ -175,6 +241,11 @@ struct GroupOperationArgs {
 
 #[tokio::main]
 async fn main() -> Result<()> {
+    // let users = read_sync_file_to_map(".mail_sync").await?;
+
+    // for (email, username) in &users {
+    // println!("{email} => {username}");
+    // }
     let cli = Cli::parse();
     init_tracing(cli.verbose);
 
@@ -191,18 +262,9 @@ async fn main() -> Result<()> {
 
             run_sync(&common).await?;
         }
-        _ => unimplemented!(),
-        // Commands::Add(args) => {
-        //     let common = CommonOperationArgs {
-        //         pool_id: args.pool_id,
-        //         groups: args.groups,
-        //         emails_file: Some(args.emails_file),
-        //         concurrency: args.concurrency,
-        //         timeout: args.timeout,
-        //     };
-
-        //     run_add_groups(common).await?;
-        // }
+        Commands::Add(args) => {
+            run_add_groups(args).await?;
+        }
         // Commands::Del(args) => {
         //     let common = CommonOperationArgs {
         //         pool_id: args.pool_id,
@@ -214,6 +276,7 @@ async fn main() -> Result<()> {
 
         //     run_remove_groups(common).await?;
         // }
+        _ => unimplemented!(),
     }
 
     Ok(())
@@ -265,9 +328,8 @@ pub async fn run_sync(args: &CommonOperationArgs) -> R
         "Starting users sync from Cognito user pool"
     );
 
-    let config = aws_config::load_from_env()
-        .await;
-        // .context("failed to load AWS configuration")?;
+    let config = aws_config::load_from_env().await;
+    // .context("failed to load AWS configuration")?;
     let client = CognitoClient::new(&config);
 
     let timeout = args.timeout.map(Duration::from_secs);
@@ -294,21 +356,28 @@ pub async fn run_sync(args: &CommonOperationArgs) -> R
     Ok(())
 }
 
-async fn run_add_groups(args: CommonOperationArgs) -> Result<()> {
-    info!(
-        pool_id = %args.pool_id,
-        emails_file = ?args.emails_file,
-        concurrency = ?args.concurrency,
-        timeout = ?args.timeout,
-        "add groups operation requested (not implemented yet)"
-    );
+async fn run_add_groups(args: AddArgs) -> Result<()> {
+    // info!(
+    //     pool_id = %args.pool_id,
+    //     emails_file = ?args.emails_file,
+    //     concurrency = ?args.concurrency,
+    //     timeout = ?args.timeout,
+    //     "add groups operation requested (not implemented yet)"
+    // );
 
-    if let Some(seconds) = args.timeout {
-        let _timeout = Duration::from_secs(seconds);
-        debug!(?seconds, "add operation timeout configured");
-    }
+    let config = aws_config::load_from_env().await;
+    let client = CognitoClient::new(&config);
 
-    // TODO: implement add-to-groups logic.
+    add_users_to_groups_from_files(
+        &client,
+        &args.pool_id,
+        &args.sync_file,
+        &args.emails_file,
+        &args.groups,
+        args.concurrency,
+    )
+    .await?;
+
     Ok(())
 }
 
@@ -334,15 +403,19 @@ async fn run_remove_groups(args: CommonOperationArgs) 
 ///
 /// If `args.emails_file` is set, the CSV is written to that file.
 /// Otherwise, the CSV is written to stdout.
-pub(crate)async fn sync_users_to_csv(client: &CognitoClient, args: &CommonOperationArgs) -> Result<()> {
-    let mut writer: Box<dyn AsyncWrite + Unpin + Send> = if let Some(path) = &args.emails_file {
-        let file = File::create(path)
-            .await
-            .with_context(|| format!("failed to create output file at '{}'", path.display()))?;
-        Box::new(file)
-    } else {
-        Box::new(io::stdout())
-    };
+pub(crate) async fn sync_users_to_csv(
+    client: &CognitoClient,
+    args: &CommonOperationArgs,
+) -> Result<()> {
+    let mut writer: Box<dyn AsyncWrite + Unpin + Send> =
+        if let Some(path) = &args.emails_file {
+            let file = File::create(path).await.with_context(|| {
+                format!("failed to create output file at '{}'", path.display())
+            })?;
+            Box::new(file)
+        } else {
+            Box::new(io::stdout())
+        };
 
     // CSV header
     writer
@@ -382,9 +455,8 @@ pub(crate)async fn sync_users_to_csv(client: &CognitoC
             total_users += 1;
         }
 
-        pagination_token = response
-            .pagination_token()
-            .map(|token| token.to_owned());
+        pagination_token =
+            response.pagination_token().map(|token| token.to_owned());
 
         if pagination_token.is_none() {
             break;
@@ -412,3 +484,205 @@ fn extract_username_and_email(user: &UserType) -> (Str
     (username, email)
 }
 
+/// Read a Cognito sync CSV file ("username,email") and return a
+/// HashMap<email, username>.
+///
+/// Expected CSV format:
+/// ```text
+/// username,email
+/// johndoe,john@example.com
+/// alice,alice@example.com
+/// ...
+/// ```
+///
+/// Notes:
+/// - The first line is treated as a header and skipped.
+/// - Lines missing either username or email are ignored.
+/// - Email is lowercased and trimmed to allow normalized lookups.
+pub async fn read_sync_file_to_map<P: AsRef<Path>>(
+    path: P,
+) -> Result<HashMap<String, String>> {
+    let file = File::open(&path).await.with_context(|| {
+        format!("failed to open sync file '{}'", path.as_ref().display())
+    })?;
+
+    let reader = BufReader::new(file);
+    let mut lines = reader.lines();
+
+    let mut map = HashMap::<String, String>::new();
+
+    // Skip header: "username,email"
+    if let Some(line) = lines.next_line().await? {
+        let _header = line.trim();
+        // We can ignore header validation for now.
+    }
+
+    while let Some(line) = lines.next_line().await? {
+        let trimmed = line.trim();
+        if trimmed.is_empty() {
+            continue;
+        }
+
+        let parts: Vec<&str> = trimmed.split(',').collect();
+        if parts.len() < 2 {
+            // malformed line, skip
+            continue;
+        }
+
+        let username = parts[0].trim();
+        let email = parts[1].trim().to_lowercase();
+
+        if !email.is_empty() && !username.is_empty() {
+            map.insert(email, username.to_string());
+        }
+    }
+
+    Ok(map)
+}
+
+/// Load a plain-text file containing one e-mail per line.
+///
+/// Each line is trimmed and lowercased. Empty lines are skipped.
+pub async fn load_email_list<P: AsRef<Path>>(path: P) -> Result<Vec<String>> {
+    let file = File::open(&path).await.with_context(|| {
+        format!(
+            "failed to open e-mail list file '{}'",
+            path.as_ref().display()
+        )
+    })?;
+
+    let reader = BufReader::new(file);
+    let mut lines = reader.lines();
+    let mut emails = Vec::new();
+
+    while let Some(line) = lines.next_line().await? {
+        let email = line.trim().to_lowercase();
+        if email.is_empty() {
+            continue;
+        }
+        emails.push(email);
+    }
+
+    Ok(emails)
+}
+
+/// Add a Cognito user to one or more groups using the Admin API.
+///
+/// This function assumes AWS credentials and permissions allow admin operations.
+pub async fn admin_add_user_to_groups(
+    client: &CognitoClient,
+    pool_id: &str,
+    username: &str,
+    groups: &[String],
+) -> Result<()> {
+    for group in groups {
+        info!(%username, %group, %pool_id, "adding user to Cognito group");
+
+        client
+            .admin_add_user_to_group()
+            .user_pool_id(pool_id)
+            .username(username)
+            .group_name(group)
+            .send()
+            .await
+            .with_context(|| {
+                format!(
+                    "failed to add user '{}' to group '{}'",
+                    username, group
+                )
+            })?;
+
+        info!(%username, %group, "user successfully added to group");
+    }
+
+    Ok(())
+}
+
+/// High-level flow:
+/// 1. Load sync CSV into HashMap<email, username>.
+/// 2. Load target e-mails (one per line).
+/// 3. For each email, find username in HashMap.
+/// 4. Call `admin_add_user_to_groups` with concurrency limit.
+/// 5. Log success or failure for each email.
+///
+/// `sync_csv_path`: CSV generated by `sync` (username,email).
+/// `emails_list_path`: TXT file with one e-mail per line (users to be added).
+pub async fn add_users_to_groups_from_files(
+    client: &CognitoClient,
+    pool_id: &str,
+    sync_csv_path: &Path,
+    emails_list_path: &Path,
+    groups: &[String],
+    concurrency: usize,
+) -> Result<()> {
+    let concurrency = std::cmp::max(1, concurrency);
+
+    // 1. Load sync CSV into map: email -> username
+    let email_to_username = read_sync_file_to_map(sync_csv_path).await?;
+    info!(
+        total_entries = email_to_username.len(),
+        "loaded sync map (email -> username)"
+    );
+
+    // 2. Load target e-mails
+    let emails = load_email_list(emails_list_path).await?;
+    info!(total_emails = emails.len(), "loaded target e-mail list");
+
+    let semaphore = Arc::new(Semaphore::new(concurrency));
+    let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(emails.len());
+
+    for email in emails {
+        let username = match email_to_username.get(&email) {
+            Some(u) => u.clone(),
+            None => {
+                error!(%email, "e-mail not found in sync map, skipping");
+                continue;
+            }
+        };
+
+        let permit = semaphore.clone().acquire_owned().await?;
+        let client_clone = client.clone();
+        let pool_id = pool_id.to_string();
+        let groups = groups.to_vec();
+
+        let handle = tokio::spawn(async move {
+            let _permit = permit; // keep permit alive for the duration of this task
+
+            if let Err(err) = admin_add_user_to_groups(
+                &client_clone,
+                &pool_id,
+                &username,
+                &groups,
+            )
+            .await
+            {
+                error!(
+                    %email,
+                    %username,
+                    error = ?err,
+                    "failed to add user to one or more groups"
+                );
+            } else {
+                info!(
+                    %email,
+                    %username,
+                    groups = ?groups,
+                    "user successfully processed for all groups"
+                );
+            }
+        });
+
+        handles.push(handle);
+    }
+
+    // Wait for all tasks to complete
+    for handle in handles {
+        // Ignore panics here, just surface as error logs.
+        if let Err(join_err) = handle.await {
+            error!(error = ?join_err, "join error while processing a user");
+        }
+    }
+
+    info!("finished processing all users for add-groups operation");
+    Ok(())
+}