commit b2da4a0ec679e2071c2a3a981b5e8c5caef83235 from: murilo ijanc date: Fri Nov 21 00:31:08 2025 UTC Refactoring Cli - Move pool_id, concurrency and timeout to global values for all arguments. - For now ignore add group commit - 721919810f017665ec6bbc8ef3f36f81e7913228 commit + b2da4a0ec679e2071c2a3a981b5e8c5caef83235 blob - b45356c7bb31c0713cb769ca56cfaca69174830a blob + 3249cffaf63d913464f8b8e339debdce16f38121 --- src/main.rs +++ src/main.rs @@ -1,3 +1,4 @@ +#![allow(unused)] // // Copyright (c) 2025 murilo ijanc' // @@ -50,6 +51,21 @@ const VERSION: &str = concat!( author, )] struct Cli { + /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). + #[arg(long = "pool-id")] + pub pool_id: String, + + /// Maximum number of users to process concurrently. + /// + /// This controls how many users are handled in parallel when calling + /// the AdminAddUserToGroup API. + #[arg(long = "concurrency", value_name = "N", default_value_t = 4)] + pub concurrency: usize, + + /// Global timeout for the sync operation, in seconds. + #[arg(long)] + timeout: Option, + /// Increase verbosity (use -v, -vv, ...). /// /// When no RUST_LOG is set, a single -v switches the log level to DEBUG. @@ -73,9 +89,8 @@ enum Commands { /// Add users to one or more Cognito groups. Add(AddArgs), - - /// Remove users from one or more Cognito groups. - Del(GroupOperationArgs), + // /// Remove users from one or more Cognito groups. + // Del(GroupOperationArgs), } /// Arguments for the `add` operation. @@ -89,10 +104,6 @@ enum Commands { /// - `concurrency` defines how many users are processed in parallel. #[derive(Debug, Parser)] pub struct AddArgs { - /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). - #[arg(long = "pool-id")] - pub pool_id: String, - /// CSV file generated by the `sync` command (username,email). /// /// This file is loaded into a HashMap and used @@ -126,84 +137,26 @@ pub struct AddArgs { required = true )] pub groups: Vec, - - /// Maximum number of users to process concurrently. - /// - /// This controls how many users are handled in parallel when calling - /// the AdminAddUserToGroup API. - #[arg(long = "concurrency", value_name = "N", default_value_t = 4)] - pub concurrency: usize, - - /// Global timeout for the operation, in seconds. - /// - /// When set, the entire add operation is bounded by this timeout. - #[arg(long = "timeout", value_name = "SECONDS")] - pub timeout: Option, } -/// Common arguments shared by group-based operations. -#[derive(clap::Args, Debug, Clone)] -pub struct CommonOperationArgs { - /// Cognito User Pool ID to operate on. - #[arg(long = "pool-id", env = "COGNITO_USER_POOL_ID")] - pub pool_id: String, - - /// File path used by the operation. - /// For `sync`, this is the output file where usernames and emails are stored as CSV. - #[arg( - short = 'f', - long = "file", - value_name = "PATH", - help = "File path. For `sync`, this is the output CSV file." - )] - pub emails_file: Option, - - /// Maximum duration (in seconds) allowed for the operation. - #[arg(long = "timeout", value_name = "SECONDS")] - pub timeout: Option, - - /// Concurrency level for operations that need it. - #[arg(long = "concurrency", value_name = "N", default_value_t = 1)] - pub concurrency: usize, -} - /// Arguments for the `sync` operation. #[derive(Debug, Parser)] struct SyncArgs { - /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). - #[arg(long = "pool-id")] - pool_id: String, - - /// Optional file containing user e-mails, one per line. + /// CSV file generated by the `sync` command (username,email). /// - /// Depending on the design, this can represent the source of truth - /// to be synchronized with the Cognito user pool. - #[arg(long = "emails-file")] - emails_file: Option, - - /// Optional list of Cognito group names used during synchronization. - /// - /// These can be used to ensure users are added/removed from specific - /// groups during the sync process. - #[arg(long = "group", alias = "groups")] - groups: Vec, - - /// Maximum number of concurrent operations. - #[arg(long)] - concurrency: Option, - - /// Global timeout for the sync operation, in seconds. - #[arg(long)] - timeout: Option, + /// This file is loaded into a HashMap and used + /// as the source of truth for resolving usernames from e-mail. + #[arg( + long = "sync-file", + value_name = "CSV_PATH", + help = "CSV file produced by `sync` containing username,email columns" + )] + sync_file: PathBuf, } /// Arguments shared by `add` and `del` group operations. #[derive(Debug, Parser)] struct GroupOperationArgs { - /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX). - #[arg(long = "pool-id")] - pool_id: String, - /// One or more Cognito group names. /// /// All users found in the input file will be added to or removed from @@ -217,14 +170,12 @@ struct GroupOperationArgs { /// selected group operation. #[arg(long = "emails-file")] emails_file: PathBuf, +} - /// Maximum number of concurrent operations. - #[arg(long)] - concurrency: Option, - - /// Global timeout for the operation, in seconds. - #[arg(long)] - timeout: Option, +pub struct CommonOperationArgs { + pub pool_id: String, + pub concurrency: usize, + pub timeout: Option, } #[tokio::main] @@ -234,32 +185,30 @@ async fn main() -> Result<()> { debug!("parsed CLI arguments: {cli:?}"); + let common_args = CommonOperationArgs { + pool_id: cli.pool_id, + concurrency: cli.concurrency, + timeout: cli.timeout, + }; + match cli.command { Commands::Sync(args) => { - let common = CommonOperationArgs { - pool_id: args.pool_id, - emails_file: args.emails_file, - concurrency: 1, //args.concurrency, - timeout: args.timeout, - }; - - run_sync(&common).await?; + run_sync(&common_args, &args).await?; } Commands::Add(args) => { - run_add_groups(args).await?; - } - // Commands::Del(args) => { - // let common = CommonOperationArgs { - // pool_id: args.pool_id, - // groups: args.groups, - // emails_file: Some(args.emails_file), - // concurrency: args.concurrency, - // timeout: args.timeout, - // }; + // run_add_groups(args).await?; + println!("add"); + } // Commands::Del(args) => { + // let common = CommonOperationArgs { + // pool_id: args.pool_id, + // groups: args.groups, + // emails_file: Some(args.emails_file), + // concurrency: args.concurrency, + // timeout: args.timeout, + // }; - // run_remove_groups(common).await?; - // } - _ => unimplemented!(), + // run_remove_groups(common).await?; + // } } Ok(()) @@ -293,31 +242,34 @@ fn init_tracing(verbose: u8) { /// The CSV format is: /// ```text /// username,email -/// user1@example.com,user1@example.com -/// user2@example.com,user2@example.com +/// Google_123,user1@example.com +/// Google_456,user2@example.com /// ... /// ``` /// /// Source of truth is Cognito: this command dumps all users from the pool. /// /// Behavior: +/// /// - Paginates over all Cognito users in the pool. /// - Extracts the `username` field and the `email` attribute (if present). /// - Writes the data as `username,email` to the given output file or stdout. /// - Respects the optional `timeout` passed in `CommonOperationArgs`. -pub async fn run_sync(args: &CommonOperationArgs) -> Result<()> { +async fn run_sync( + common_args: &CommonOperationArgs, + cmd_args: &SyncArgs, +) -> Result<()> { info!( - pool_id = %args.pool_id, + pool_id = %common_args.pool_id, "Starting users sync from Cognito user pool" ); let config = aws_config::load_from_env().await; - // .context("failed to load AWS configuration")?; let client = CognitoClient::new(&config); - let timeout = args.timeout.map(Duration::from_secs); + let timeout = common_args.timeout.map(Duration::from_secs); - let sync_future = sync_users_to_csv(&client, args); + let sync_future = sync_users_to_csv(&client, common_args, cmd_args); if let Some(duration) = timeout { match tokio::time::timeout(duration, sync_future).await { @@ -336,52 +288,52 @@ pub async fn run_sync(args: &CommonOperationArgs) -> R } info!("Users sync completed successfully"); + Ok(()) } -async fn run_add_groups(args: AddArgs) -> Result<()> { - // info!( - // pool_id = %args.pool_id, - // emails_file = ?args.emails_file, - // concurrency = ?args.concurrency, - // timeout = ?args.timeout, - // "add groups operation requested (not implemented yet)" - // ); +// async fn run_add_groups(args: AddArgs) -> Result<()> { +// // info!( +// // pool_id = %args.pool_id, +// // emails_file = ?args.emails_file, +// // concurrency = ?args.concurrency, +// // timeout = ?args.timeout, +// // "add groups operation requested (not implemented yet)" +// // ); - let config = aws_config::load_from_env().await; - let client = CognitoClient::new(&config); +// let config = aws_config::load_from_env().await; +// let client = CognitoClient::new(&config); - add_users_to_groups_from_files( - &client, - &args.pool_id, - &args.sync_file, - &args.emails_file, - &args.groups, - args.concurrency, - ) - .await?; +// add_users_to_groups_from_files( +// &args.pool_id, +// &args.sync_file, +// &args.emails_file, +// &args.groups, +// args.concurrency, +// ) +// .await?; - Ok(()) -} +// Ok(()) +// } -#[allow(dead_code)] -async fn run_remove_groups(args: CommonOperationArgs) -> Result<()> { - info!( - pool_id = %args.pool_id, - emails_file = ?args.emails_file, - concurrency = ?args.concurrency, - timeout = ?args.timeout, - "remove groups operation requested (not implemented yet)" - ); +// #[allow(dead_code)] +// async fn run_remove_groups(args: CommonOperationArgs) -> Result<()> { +// info!( +// pool_id = %args.pool_id, +// emails_file = ?args.emails_file, +// concurrency = ?args.concurrency, +// timeout = ?args.timeout, +// "remove groups operation requested (not implemented yet)" +// ); - if let Some(seconds) = args.timeout { - let _timeout = Duration::from_secs(seconds); - debug!(?seconds, "remove operation timeout configured"); - } +// if let Some(seconds) = args.timeout { +// let _timeout = Duration::from_secs(seconds); +// debug!(?seconds, "remove operation timeout configured"); +// } - // TODO: implement remove-from-groups logic. - Ok(()) -} +// // TODO: implement remove-from-groups logic. +// Ok(()) +// } /// Fetch all users from Cognito and write `username,email` to a CSV destination. /// @@ -389,17 +341,25 @@ async fn run_remove_groups(args: CommonOperationArgs) /// Otherwise, the CSV is written to stdout. pub(crate) async fn sync_users_to_csv( client: &CognitoClient, - args: &CommonOperationArgs, + common_args: &CommonOperationArgs, + cmd_args: &SyncArgs, ) -> Result<()> { - let mut writer: Box = - if let Some(path) = &args.emails_file { - let file = File::create(path).await.with_context(|| { - format!("failed to create output file at '{}'", path.display()) - })?; - Box::new(file) - } else { - Box::new(io::stdout()) - }; + let mut writer = + File::create(&cmd_args.sync_file).await.with_context(|| { + format!( + "failed to create output file at '{}'", + &cmd_args.sync_file.display() + ) + })?; + // let mut writer: Box = + // if let Some(path) = &cmd_args.sync_file { + // let file = File::create(path).await.with_context(|| { + // format!("failed to create output file at '{}'", path.display()) + // })?; + // Box::new(file) + // } else { + // Box::new(io::stdout()) + // }; // CSV header writer @@ -413,8 +373,8 @@ pub(crate) async fn sync_users_to_csv( loop { let mut request = client .list_users() - .user_pool_id(&args.pool_id) - // 60 is the documented default max page size for Cognito ListUsers. + .user_pool_id(&common_args.pool_id) + // https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_ListUsers.html#CognitoUserPools-ListUsers-request-Limit .limit(60); if let Some(ref token) = pagination_token { @@ -429,7 +389,6 @@ pub(crate) async fn sync_users_to_csv( for user in response.users() { let (username, email) = extract_username_and_email(user); - // If you prefer to skip users without email, you can check `email.is_empty()`. let line = format!("{username},{email}\n"); writer .write_all(line.as_bytes()) @@ -457,12 +416,14 @@ pub(crate) async fn sync_users_to_csv( fn extract_username_and_email(user: &UserType) -> (String, String) { let username = user.username().unwrap_or_default().to_string(); + // Here I'm assuming that the user doesn't have an email attribute, so the + // username is the email itself. let email = user .attributes() .iter() .find(|attr| attr.name() == "email") .and_then(|attr| attr.value()) - .unwrap_or_default() + .unwrap_or(&username) .to_string(); (username, email) @@ -550,141 +511,141 @@ pub async fn load_email_list>(path: P) Ok(emails) } -/// Add a Cognito user to one or more groups using the Admin API. -/// -/// This function assumes AWS credentials and permissions allow admin operations. -pub async fn admin_add_user_to_groups( - client: &CognitoClient, - pool_id: &str, - username: &str, - groups: &[String], -) -> Result<()> { - for group in groups { - info!(%username, %group, %pool_id, "adding user to Cognito group"); +// /// Add a Cognito user to one or more groups using the Admin API. +// /// +// /// This function assumes AWS credentials and permissions allow admin operations. +// pub async fn admin_add_user_to_groups( +// client: &CognitoClient, +// pool_id: &str, +// username: &str, +// groups: &[String], +// ) -> Result<()> { +// for group in groups { +// info!(%username, %group, %pool_id, "adding user to Cognito group"); - client - .admin_add_user_to_group() - .user_pool_id(pool_id) - .username(username) - .group_name(group) - .send() - .await - .with_context(|| { - format!( - "failed to add user '{}' to group '{}'", - username, group - ) - })?; +// client +// .admin_add_user_to_group() +// .user_pool_id(pool_id) +// .username(username) +// .group_name(group) +// .send() +// .await +// .with_context(|| { +// format!( +// "failed to add user '{}' to group '{}'", +// username, group +// ) +// })?; - info!(%username, %group, "user successfully added to group"); - } +// info!(%username, %group, "user successfully added to group"); +// } - Ok(()) -} +// Ok(()) +// } -/// High-level flow: -/// 1. Load sync CSV into HashMap. -/// 2. Load target e-mails (one per line). -/// 3. For each email, find username in HashMap. -/// 4. Call `admin_add_user_to_groups` with concurrency limit. -/// 5. Log success or failure for each email. -/// -/// `sync_csv_path`: CSV generated by `sync` (username,email). -/// `emails_list_path`: TXT file with one e-mail per line (users to be added). -pub async fn add_users_to_groups_from_files( - client: &CognitoClient, - pool_id: &str, - sync_csv_path: &Path, - emails_list_path: &Path, - groups: &[String], - concurrency: usize, -) -> Result<()> { - let concurrency = std::cmp::max(1, concurrency); +// /// High-level flow: +// /// 1. Load sync CSV into HashMap. +// /// 2. Load target e-mails (one per line). +// /// 3. For each email, find username in HashMap. +// /// 4. Call `admin_add_user_to_groups` with concurrency limit. +// /// 5. Log success or failure for each email. +// /// +// /// `sync_csv_path`: CSV generated by `sync` (username,email). +// /// `emails_list_path`: TXT file with one e-mail per line (users to be added). +// pub async fn add_users_to_groups_from_files( +// client: &CognitoClient, +// pool_id: &str, +// sync_csv_path: &Path, +// emails_list_path: &Path, +// groups: &[String], +// concurrency: usize, +// ) -> Result<()> { +// let concurrency = std::cmp::max(1, concurrency); - // 1. Load sync CSV into map: email -> username - let email_to_username = read_sync_file_to_map(sync_csv_path).await?; - info!( - total_entries = email_to_username.len(), - "loaded sync map (email -> username)" - ); +// // 1. Load sync CSV into map: email -> username +// let email_to_username = read_sync_file_to_map(sync_csv_path).await?; +// info!( +// total_entries = email_to_username.len(), +// "loaded sync map (email -> username)" +// ); - // 2. Load target e-mails - let emails = load_email_list(emails_list_path).await?; - let total_emails = emails.len(); - info!(total_emails, "loaded target e-mail list"); +// // 2. Load target e-mails +// let emails = load_email_list(emails_list_path).await?; +// let total_emails = emails.len(); +// info!(total_emails, "loaded target e-mail list"); - let pb = Arc::new({ - let bar = ProgressBar::new(total_emails as u64); - bar.set_style( - ProgressStyle::with_template( - "[{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}", - ) - .unwrap_or_else(|_| ProgressStyle::default_bar()), - ); - bar.set_message("processing users"); - bar - }); +// let pb = Arc::new({ +// let bar = ProgressBar::new(total_emails as u64); +// bar.set_style( +// ProgressStyle::with_template( +// "[{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}", +// ) +// .unwrap_or_else(|_| ProgressStyle::default_bar()), +// ); +// bar.set_message("processing users"); +// bar +// }); - let semaphore = Arc::new(Semaphore::new(concurrency)); - let mut handles: Vec> = Vec::with_capacity(emails.len()); +// let semaphore = Arc::new(Semaphore::new(concurrency)); +// let mut handles: Vec> = Vec::with_capacity(emails.len()); - for email in emails { - let username = match email_to_username.get(&email) { - Some(u) => u.clone(), - None => { - error!(%email, "e-mail not found in sync map, skipping"); - continue; - } - }; +// for email in emails { +// let username = match email_to_username.get(&email) { +// Some(u) => u.clone(), +// None => { +// error!(%email, "e-mail not found in sync map, skipping"); +// continue; +// } +// }; - let permit = semaphore.clone().acquire_owned().await?; - let client_clone = client.clone(); - let pool_id = pool_id.to_string(); - let groups = groups.to_vec(); - let email_clone = email.clone(); - let pb_clone = pb.clone(); +// let permit = semaphore.clone().acquire_owned().await?; +// let client_clone = client.clone(); +// let pool_id = pool_id.to_string(); +// let groups = groups.to_vec(); +// let email_clone = email.clone(); +// let pb_clone = pb.clone(); - let handle = tokio::spawn(async move { - let _permit = permit; // keep permit alive for the duration of this task +// let handle = tokio::spawn(async move { +// let _permit = permit; // keep permit alive for the duration of this task - if let Err(err) = admin_add_user_to_groups( - &client_clone, - &pool_id, - &username, - &groups, - ) - .await - { - error!( - email = %email_clone, - %username, - error = ?err, - "failed to add user to one or more groups" - ); - } else { - info!( - email = %email_clone, - %username, - groups = ?groups, - "user successfully processed for all groups" - ); - } +// if let Err(err) = admin_add_user_to_groups( +// &client_clone, +// &pool_id, +// &username, +// &groups, +// ) +// .await +// { +// error!( +// email = %email_clone, +// %username, +// error = ?err, +// "failed to add user to one or more groups" +// ); +// } else { +// info!( +// email = %email_clone, +// %username, +// groups = ?groups, +// "user successfully processed for all groups" +// ); +// } - pb_clone.inc(1); - }); +// pb_clone.inc(1); +// }); - handles.push(handle); - } +// handles.push(handle); +// } - // Wait for all tasks to complete - for handle in handles { - // Ignore panics here, just surface as error logs. - if let Err(join_err) = handle.await { - error!(error = ?join_err, "join error while processing a user"); - } - } +// // Wait for all tasks to complete +// for handle in handles { +// // Ignore panics here, just surface as error logs. +// if let Err(join_err) = handle.await { +// error!(error = ?join_err, "join error while processing a user"); +// } +// } - pb.finish_with_message("done"); - info!("finished processing all users for add-groups operation"); - Ok(()) -} +// pb.finish_with_message("done"); +// info!("finished processing all users for add-groups operation"); +// Ok(()) +// }