commit - 721919810f017665ec6bbc8ef3f36f81e7913228
commit + b2da4a0ec679e2071c2a3a981b5e8c5caef83235
blob - b45356c7bb31c0713cb769ca56cfaca69174830a
blob + 3249cffaf63d913464f8b8e339debdce16f38121
--- src/main.rs
+++ src/main.rs
+#![allow(unused)]
//
// Copyright (c) 2025 murilo ijanc' <murilo@ijanc.org>
//
author,
)]
struct Cli {
+ /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX).
+ #[arg(long = "pool-id")]
+ pub pool_id: String,
+
+ /// Maximum number of users to process concurrently.
+ ///
+ /// This controls how many users are handled in parallel when calling
+ /// the AdminAddUserToGroup API.
+ #[arg(long = "concurrency", value_name = "N", default_value_t = 4)]
+ pub concurrency: usize,
+
+ /// Global timeout for the sync operation, in seconds.
+ #[arg(long)]
+ timeout: Option<u64>,
+
/// Increase verbosity (use -v, -vv, ...).
///
/// When no RUST_LOG is set, a single -v switches the log level to DEBUG.
/// Add users to one or more Cognito groups.
Add(AddArgs),
-
- /// Remove users from one or more Cognito groups.
- Del(GroupOperationArgs),
+ // /// Remove users from one or more Cognito groups.
+ // Del(GroupOperationArgs),
}
/// Arguments for the `add` operation.
/// - `concurrency` defines how many users are processed in parallel.
#[derive(Debug, Parser)]
pub struct AddArgs {
- /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX).
- #[arg(long = "pool-id")]
- pub pool_id: String,
-
/// CSV file generated by the `sync` command (username,email).
///
/// This file is loaded into a HashMap<email, username> and used
required = true
)]
pub groups: Vec<String>,
-
- /// Maximum number of users to process concurrently.
- ///
- /// This controls how many users are handled in parallel when calling
- /// the AdminAddUserToGroup API.
- #[arg(long = "concurrency", value_name = "N", default_value_t = 4)]
- pub concurrency: usize,
-
- /// Global timeout for the operation, in seconds.
- ///
- /// When set, the entire add operation is bounded by this timeout.
- #[arg(long = "timeout", value_name = "SECONDS")]
- pub timeout: Option<u64>,
}
-/// Common arguments shared by group-based operations.
-#[derive(clap::Args, Debug, Clone)]
-pub struct CommonOperationArgs {
- /// Cognito User Pool ID to operate on.
- #[arg(long = "pool-id", env = "COGNITO_USER_POOL_ID")]
- pub pool_id: String,
-
- /// File path used by the operation.
- /// For `sync`, this is the output file where usernames and emails are stored as CSV.
- #[arg(
- short = 'f',
- long = "file",
- value_name = "PATH",
- help = "File path. For `sync`, this is the output CSV file."
- )]
- pub emails_file: Option<PathBuf>,
-
- /// Maximum duration (in seconds) allowed for the operation.
- #[arg(long = "timeout", value_name = "SECONDS")]
- pub timeout: Option<u64>,
-
- /// Concurrency level for operations that need it.
- #[arg(long = "concurrency", value_name = "N", default_value_t = 1)]
- pub concurrency: usize,
-}
-
/// Arguments for the `sync` operation.
#[derive(Debug, Parser)]
struct SyncArgs {
- /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX).
- #[arg(long = "pool-id")]
- pool_id: String,
-
- /// Optional file containing user e-mails, one per line.
+ /// CSV file generated by the `sync` command (username,email).
///
- /// Depending on the design, this can represent the source of truth
- /// to be synchronized with the Cognito user pool.
- #[arg(long = "emails-file")]
- emails_file: Option<PathBuf>,
-
- /// Optional list of Cognito group names used during synchronization.
- ///
- /// These can be used to ensure users are added/removed from specific
- /// groups during the sync process.
- #[arg(long = "group", alias = "groups")]
- groups: Vec<String>,
-
- /// Maximum number of concurrent operations.
- #[arg(long)]
- concurrency: Option<usize>,
-
- /// Global timeout for the sync operation, in seconds.
- #[arg(long)]
- timeout: Option<u64>,
+ /// This file is loaded into a HashMap<email, username> and used
+ /// as the source of truth for resolving usernames from e-mail.
+ #[arg(
+ long = "sync-file",
+ value_name = "CSV_PATH",
+ help = "CSV file produced by `sync` containing username,email columns"
+ )]
+ sync_file: PathBuf,
}
/// Arguments shared by `add` and `del` group operations.
#[derive(Debug, Parser)]
struct GroupOperationArgs {
- /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX).
- #[arg(long = "pool-id")]
- pool_id: String,
-
/// One or more Cognito group names.
///
/// All users found in the input file will be added to or removed from
/// selected group operation.
#[arg(long = "emails-file")]
emails_file: PathBuf,
+}
- /// Maximum number of concurrent operations.
- #[arg(long)]
- concurrency: Option<usize>,
-
- /// Global timeout for the operation, in seconds.
- #[arg(long)]
- timeout: Option<u64>,
+pub struct CommonOperationArgs {
+ pub pool_id: String,
+ pub concurrency: usize,
+ pub timeout: Option<u64>,
}
#[tokio::main]
debug!("parsed CLI arguments: {cli:?}");
+ let common_args = CommonOperationArgs {
+ pool_id: cli.pool_id,
+ concurrency: cli.concurrency,
+ timeout: cli.timeout,
+ };
+
match cli.command {
Commands::Sync(args) => {
- let common = CommonOperationArgs {
- pool_id: args.pool_id,
- emails_file: args.emails_file,
- concurrency: 1, //args.concurrency,
- timeout: args.timeout,
- };
-
- run_sync(&common).await?;
+ run_sync(&common_args, &args).await?;
}
Commands::Add(args) => {
- run_add_groups(args).await?;
- }
- // Commands::Del(args) => {
- // let common = CommonOperationArgs {
- // pool_id: args.pool_id,
- // groups: args.groups,
- // emails_file: Some(args.emails_file),
- // concurrency: args.concurrency,
- // timeout: args.timeout,
- // };
+ // run_add_groups(args).await?;
+ println!("add");
+ } // Commands::Del(args) => {
+ // let common = CommonOperationArgs {
+ // pool_id: args.pool_id,
+ // groups: args.groups,
+ // emails_file: Some(args.emails_file),
+ // concurrency: args.concurrency,
+ // timeout: args.timeout,
+ // };
- // run_remove_groups(common).await?;
- // }
- _ => unimplemented!(),
+ // run_remove_groups(common).await?;
+ // }
}
Ok(())
/// The CSV format is:
/// ```text
/// username,email
-/// user1@example.com,user1@example.com
-/// user2@example.com,user2@example.com
+/// Google_123,user1@example.com
+/// Google_456,user2@example.com
/// ...
/// ```
///
/// Source of truth is Cognito: this command dumps all users from the pool.
///
/// Behavior:
+///
/// - Paginates over all Cognito users in the pool.
/// - Extracts the `username` field and the `email` attribute (if present).
/// - Writes the data as `username,email` to the given output file or stdout.
/// - Respects the optional `timeout` passed in `CommonOperationArgs`.
-pub async fn run_sync(args: &CommonOperationArgs) -> Result<()> {
+async fn run_sync(
+ common_args: &CommonOperationArgs,
+ cmd_args: &SyncArgs,
+) -> Result<()> {
info!(
- pool_id = %args.pool_id,
+ pool_id = %common_args.pool_id,
"Starting users sync from Cognito user pool"
);
let config = aws_config::load_from_env().await;
- // .context("failed to load AWS configuration")?;
let client = CognitoClient::new(&config);
- let timeout = args.timeout.map(Duration::from_secs);
+ let timeout = common_args.timeout.map(Duration::from_secs);
- let sync_future = sync_users_to_csv(&client, args);
+ let sync_future = sync_users_to_csv(&client, common_args, cmd_args);
if let Some(duration) = timeout {
match tokio::time::timeout(duration, sync_future).await {
}
info!("Users sync completed successfully");
+
Ok(())
}
-async fn run_add_groups(args: AddArgs) -> Result<()> {
- // info!(
- // pool_id = %args.pool_id,
- // emails_file = ?args.emails_file,
- // concurrency = ?args.concurrency,
- // timeout = ?args.timeout,
- // "add groups operation requested (not implemented yet)"
- // );
+// async fn run_add_groups(args: AddArgs) -> Result<()> {
+// // info!(
+// // pool_id = %args.pool_id,
+// // emails_file = ?args.emails_file,
+// // concurrency = ?args.concurrency,
+// // timeout = ?args.timeout,
+// // "add groups operation requested (not implemented yet)"
+// // );
- let config = aws_config::load_from_env().await;
- let client = CognitoClient::new(&config);
+// let config = aws_config::load_from_env().await;
+// let client = CognitoClient::new(&config);
- add_users_to_groups_from_files(
- &client,
- &args.pool_id,
- &args.sync_file,
- &args.emails_file,
- &args.groups,
- args.concurrency,
- )
- .await?;
+// add_users_to_groups_from_files(
+// &args.pool_id,
+// &args.sync_file,
+// &args.emails_file,
+// &args.groups,
+// args.concurrency,
+// )
+// .await?;
- Ok(())
-}
+// Ok(())
+// }
-#[allow(dead_code)]
-async fn run_remove_groups(args: CommonOperationArgs) -> Result<()> {
- info!(
- pool_id = %args.pool_id,
- emails_file = ?args.emails_file,
- concurrency = ?args.concurrency,
- timeout = ?args.timeout,
- "remove groups operation requested (not implemented yet)"
- );
+// #[allow(dead_code)]
+// async fn run_remove_groups(args: CommonOperationArgs) -> Result<()> {
+// info!(
+// pool_id = %args.pool_id,
+// emails_file = ?args.emails_file,
+// concurrency = ?args.concurrency,
+// timeout = ?args.timeout,
+// "remove groups operation requested (not implemented yet)"
+// );
- if let Some(seconds) = args.timeout {
- let _timeout = Duration::from_secs(seconds);
- debug!(?seconds, "remove operation timeout configured");
- }
+// if let Some(seconds) = args.timeout {
+// let _timeout = Duration::from_secs(seconds);
+// debug!(?seconds, "remove operation timeout configured");
+// }
- // TODO: implement remove-from-groups logic.
- Ok(())
-}
+// // TODO: implement remove-from-groups logic.
+// Ok(())
+// }
/// Fetch all users from Cognito and write `username,email` to a CSV destination.
///
/// Otherwise, the CSV is written to stdout.
pub(crate) async fn sync_users_to_csv(
client: &CognitoClient,
- args: &CommonOperationArgs,
+ common_args: &CommonOperationArgs,
+ cmd_args: &SyncArgs,
) -> Result<()> {
- let mut writer: Box<dyn AsyncWrite + Unpin + Send> =
- if let Some(path) = &args.emails_file {
- let file = File::create(path).await.with_context(|| {
- format!("failed to create output file at '{}'", path.display())
- })?;
- Box::new(file)
- } else {
- Box::new(io::stdout())
- };
+ let mut writer =
+ File::create(&cmd_args.sync_file).await.with_context(|| {
+ format!(
+ "failed to create output file at '{}'",
+ &cmd_args.sync_file.display()
+ )
+ })?;
+ // let mut writer: Box<dyn AsyncWrite + Unpin + Send> =
+ // if let Some(path) = &cmd_args.sync_file {
+ // let file = File::create(path).await.with_context(|| {
+ // format!("failed to create output file at '{}'", path.display())
+ // })?;
+ // Box::new(file)
+ // } else {
+ // Box::new(io::stdout())
+ // };
// CSV header
writer
loop {
let mut request = client
.list_users()
- .user_pool_id(&args.pool_id)
- // 60 is the documented default max page size for Cognito ListUsers.
+ .user_pool_id(&common_args.pool_id)
+ // https://docs.aws.amazon.com/cognito-user-identity-pools/latest/APIReference/API_ListUsers.html#CognitoUserPools-ListUsers-request-Limit
.limit(60);
if let Some(ref token) = pagination_token {
for user in response.users() {
let (username, email) = extract_username_and_email(user);
- // If you prefer to skip users without email, you can check `email.is_empty()`.
let line = format!("{username},{email}\n");
writer
.write_all(line.as_bytes())
fn extract_username_and_email(user: &UserType) -> (String, String) {
let username = user.username().unwrap_or_default().to_string();
+ // Here I'm assuming that the user doesn't have an email attribute, so the
+ // username is the email itself.
let email = user
.attributes()
.iter()
.find(|attr| attr.name() == "email")
.and_then(|attr| attr.value())
- .unwrap_or_default()
+ .unwrap_or(&username)
.to_string();
(username, email)
Ok(emails)
}
-/// Add a Cognito user to one or more groups using the Admin API.
-///
-/// This function assumes AWS credentials and permissions allow admin operations.
-pub async fn admin_add_user_to_groups(
- client: &CognitoClient,
- pool_id: &str,
- username: &str,
- groups: &[String],
-) -> Result<()> {
- for group in groups {
- info!(%username, %group, %pool_id, "adding user to Cognito group");
+// /// Add a Cognito user to one or more groups using the Admin API.
+// ///
+// /// This function assumes AWS credentials and permissions allow admin operations.
+// pub async fn admin_add_user_to_groups(
+// client: &CognitoClient,
+// pool_id: &str,
+// username: &str,
+// groups: &[String],
+// ) -> Result<()> {
+// for group in groups {
+// info!(%username, %group, %pool_id, "adding user to Cognito group");
- client
- .admin_add_user_to_group()
- .user_pool_id(pool_id)
- .username(username)
- .group_name(group)
- .send()
- .await
- .with_context(|| {
- format!(
- "failed to add user '{}' to group '{}'",
- username, group
- )
- })?;
+// client
+// .admin_add_user_to_group()
+// .user_pool_id(pool_id)
+// .username(username)
+// .group_name(group)
+// .send()
+// .await
+// .with_context(|| {
+// format!(
+// "failed to add user '{}' to group '{}'",
+// username, group
+// )
+// })?;
- info!(%username, %group, "user successfully added to group");
- }
+// info!(%username, %group, "user successfully added to group");
+// }
- Ok(())
-}
+// Ok(())
+// }
-/// High-level flow:
-/// 1. Load sync CSV into HashMap<email, username>.
-/// 2. Load target e-mails (one per line).
-/// 3. For each email, find username in HashMap.
-/// 4. Call `admin_add_user_to_groups` with concurrency limit.
-/// 5. Log success or failure for each email.
-///
-/// `sync_csv_path`: CSV generated by `sync` (username,email).
-/// `emails_list_path`: TXT file with one e-mail per line (users to be added).
-pub async fn add_users_to_groups_from_files(
- client: &CognitoClient,
- pool_id: &str,
- sync_csv_path: &Path,
- emails_list_path: &Path,
- groups: &[String],
- concurrency: usize,
-) -> Result<()> {
- let concurrency = std::cmp::max(1, concurrency);
+// /// High-level flow:
+// /// 1. Load sync CSV into HashMap<email, username>.
+// /// 2. Load target e-mails (one per line).
+// /// 3. For each email, find username in HashMap.
+// /// 4. Call `admin_add_user_to_groups` with concurrency limit.
+// /// 5. Log success or failure for each email.
+// ///
+// /// `sync_csv_path`: CSV generated by `sync` (username,email).
+// /// `emails_list_path`: TXT file with one e-mail per line (users to be added).
+// pub async fn add_users_to_groups_from_files(
+// client: &CognitoClient,
+// pool_id: &str,
+// sync_csv_path: &Path,
+// emails_list_path: &Path,
+// groups: &[String],
+// concurrency: usize,
+// ) -> Result<()> {
+// let concurrency = std::cmp::max(1, concurrency);
- // 1. Load sync CSV into map: email -> username
- let email_to_username = read_sync_file_to_map(sync_csv_path).await?;
- info!(
- total_entries = email_to_username.len(),
- "loaded sync map (email -> username)"
- );
+// // 1. Load sync CSV into map: email -> username
+// let email_to_username = read_sync_file_to_map(sync_csv_path).await?;
+// info!(
+// total_entries = email_to_username.len(),
+// "loaded sync map (email -> username)"
+// );
- // 2. Load target e-mails
- let emails = load_email_list(emails_list_path).await?;
- let total_emails = emails.len();
- info!(total_emails, "loaded target e-mail list");
+// // 2. Load target e-mails
+// let emails = load_email_list(emails_list_path).await?;
+// let total_emails = emails.len();
+// info!(total_emails, "loaded target e-mail list");
- let pb = Arc::new({
- let bar = ProgressBar::new(total_emails as u64);
- bar.set_style(
- ProgressStyle::with_template(
- "[{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
- )
- .unwrap_or_else(|_| ProgressStyle::default_bar()),
- );
- bar.set_message("processing users");
- bar
- });
+// let pb = Arc::new({
+// let bar = ProgressBar::new(total_emails as u64);
+// bar.set_style(
+// ProgressStyle::with_template(
+// "[{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} {msg}",
+// )
+// .unwrap_or_else(|_| ProgressStyle::default_bar()),
+// );
+// bar.set_message("processing users");
+// bar
+// });
- let semaphore = Arc::new(Semaphore::new(concurrency));
- let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(emails.len());
+// let semaphore = Arc::new(Semaphore::new(concurrency));
+// let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(emails.len());
- for email in emails {
- let username = match email_to_username.get(&email) {
- Some(u) => u.clone(),
- None => {
- error!(%email, "e-mail not found in sync map, skipping");
- continue;
- }
- };
+// for email in emails {
+// let username = match email_to_username.get(&email) {
+// Some(u) => u.clone(),
+// None => {
+// error!(%email, "e-mail not found in sync map, skipping");
+// continue;
+// }
+// };
- let permit = semaphore.clone().acquire_owned().await?;
- let client_clone = client.clone();
- let pool_id = pool_id.to_string();
- let groups = groups.to_vec();
- let email_clone = email.clone();
- let pb_clone = pb.clone();
+// let permit = semaphore.clone().acquire_owned().await?;
+// let client_clone = client.clone();
+// let pool_id = pool_id.to_string();
+// let groups = groups.to_vec();
+// let email_clone = email.clone();
+// let pb_clone = pb.clone();
- let handle = tokio::spawn(async move {
- let _permit = permit; // keep permit alive for the duration of this task
+// let handle = tokio::spawn(async move {
+// let _permit = permit; // keep permit alive for the duration of this task
- if let Err(err) = admin_add_user_to_groups(
- &client_clone,
- &pool_id,
- &username,
- &groups,
- )
- .await
- {
- error!(
- email = %email_clone,
- %username,
- error = ?err,
- "failed to add user to one or more groups"
- );
- } else {
- info!(
- email = %email_clone,
- %username,
- groups = ?groups,
- "user successfully processed for all groups"
- );
- }
+// if let Err(err) = admin_add_user_to_groups(
+// &client_clone,
+// &pool_id,
+// &username,
+// &groups,
+// )
+// .await
+// {
+// error!(
+// email = %email_clone,
+// %username,
+// error = ?err,
+// "failed to add user to one or more groups"
+// );
+// } else {
+// info!(
+// email = %email_clone,
+// %username,
+// groups = ?groups,
+// "user successfully processed for all groups"
+// );
+// }
- pb_clone.inc(1);
- });
+// pb_clone.inc(1);
+// });
- handles.push(handle);
- }
+// handles.push(handle);
+// }
- // Wait for all tasks to complete
- for handle in handles {
- // Ignore panics here, just surface as error logs.
- if let Err(join_err) = handle.await {
- error!(error = ?join_err, "join error while processing a user");
- }
- }
+// // Wait for all tasks to complete
+// for handle in handles {
+// // Ignore panics here, just surface as error logs.
+// if let Err(join_err) = handle.await {
+// error!(error = ?join_err, "join error while processing a user");
+// }
+// }
- pb.finish_with_message("done");
- info!("finished processing all users for add-groups operation");
- Ok(())
-}
+// pb.finish_with_message("done");
+// info!("finished processing all users for add-groups operation");
+// Ok(())
+// }