commit - 159646d3ac75937fe0b83a2f97f52ce9418510eb
commit + 6116f744308210e8f722a63b08a5d714ec7b7c40
blob - ea8c4bf7f35f6f77f75d92ad8ce8349f6e81ddba
blob + ed7cf036e55b15528829f63580e1131885dabf1d
--- .gitignore
+++ .gitignore
/target
+.mail_sync
blob - c01caf510419879c5bc6f7cbec1102e9df4669f0
blob + ce03734d18a04a393b07ddb9f8f2fa8f46efcdff
--- Cargo.lock
+++ Cargo.lock
checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
[[package]]
+name = "lock_api"
+version = "0.4.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
+dependencies = [
+ "scopeguard",
+]
+
+[[package]]
name = "log"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"
[[package]]
+name = "parking_lot"
+version = "0.12.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
+dependencies = [
+ "lock_api",
+ "parking_lot_core",
+]
+
+[[package]]
+name = "parking_lot_core"
+version = "0.9.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
+dependencies = [
+ "cfg-if",
+ "libc",
+ "redox_syscall",
+ "smallvec",
+ "windows-link",
+]
+
+[[package]]
name = "percent-encoding"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
+name = "redox_syscall"
+version = "0.5.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
+dependencies = [
+ "bitflags",
+]
+
+[[package]]
name = "regex"
version = "1.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
]
[[package]]
+name = "scopeguard"
+version = "1.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
+
+[[package]]
name = "sct"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
"bytes",
"libc",
"mio",
+ "parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.1",
blob - 9ad96f231a7da19d7f9ad5121831913f19159994
blob + 2221b8157522e67a09fad9f0c67101f15f5aff01
--- Cargo.toml
+++ Cargo.toml
aws-config = { version = "1.8.10", features = ["behavior-version-latest"] }
aws-sdk-cognitoidentityprovider = "1.103.0"
clap = { version = "4.5.52", features = ["derive", "env"] }
-tokio = { version = "1.48.0", features = ["macros", "rt-multi-thread", "fs", "io-std"] }
+tokio = { version = "1.48.0", features = ["full"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
blob - 896804d1a7e2f3f51bae5bad0af3dfab9a297776
blob + aaac2ae2d7af614e1b350ea0fe82a5934b1aaba6
--- src/main.rs
+++ src/main.rs
// accept timeout
// accept poolid
-
mod helper;
+use std::collections::HashMap;
+use std::path::Path;
use std::path::PathBuf;
+use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
-use clap::{ArgAction, Parser, Subcommand};
-use tracing::{debug, info};
-use tracing_subscriber::EnvFilter;
-use tokio::fs::File;
-use tokio::io::{self, AsyncWrite, AsyncWriteExt};
use aws_sdk_cognitoidentityprovider::Client as CognitoClient;
use aws_sdk_cognitoidentityprovider::types::UserType;
+use clap::{ArgAction, Parser, Subcommand};
+use tokio::fs::File;
+use tokio::io::{self, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
+use tokio::sync::Semaphore;
+use tokio::task::JoinHandle;
+use tracing::{debug, error, info};
+use tracing_subscriber::EnvFilter;
-
const LONG_VERSION: &str = concat!(
env!("CARGO_PKG_NAME"),
" ",
Sync(SyncArgs),
/// Add users to one or more Cognito groups.
- Add(GroupOperationArgs),
+ Add(AddArgs),
/// Remove users from one or more Cognito groups.
Del(GroupOperationArgs),
}
+/// Arguments for the `add` operation.
+///
+/// High-level flow:
+/// - `sync_file` is the CSV produced by `sync` (username,email).
+/// - `emails_file` is a plain text file with one e-mail per line,
+/// representing the users that should be added to the given groups.
+/// - `groups` is the list of Cognito group names.
+/// - `pool_id` is the Cognito User Pool ID.
+/// - `concurrency` defines how many users are processed in parallel.
+#[derive(Debug, Parser)]
+pub struct AddArgs {
+ /// Cognito User Pool ID (e.g. us-east-1_XXXXXXXXX).
+ #[arg(long = "pool-id")]
+ pub pool_id: String,
+
+ /// CSV file generated by the `sync` command (username,email).
+ ///
+ /// This file is loaded into a HashMap<email, username> and used
+ /// as the source of truth for resolving usernames from e-mail.
+ #[arg(
+ long = "sync-file",
+ value_name = "CSV_PATH",
+ help = "CSV file produced by `sync` containing username,email columns"
+ )]
+ pub sync_file: PathBuf,
+
+ /// Text file containing one e-mail per line.
+ ///
+ /// Each e-mail will be normalized (trim + lowercase) and then
+ /// looked up in the sync CSV map to obtain the corresponding username.
+ #[arg(
+ long = "emails-file",
+ value_name = "TXT_PATH",
+ help = "Plain text file with one e-mail per line to be added to the groups"
+ )]
+ pub emails_file: PathBuf,
+
+ /// One or more Cognito group names.
+ ///
+ /// All resolved users will be added to every group listed here.
+ #[arg(
+ long = "group",
+ alias = "groups",
+ value_name = "GROUP",
+ num_args = 1..,
+ required = true
+ )]
+ pub groups: Vec<String>,
+
+ /// Maximum number of users to process concurrently.
+ ///
+ /// This controls how many users are handled in parallel when calling
+ /// the AdminAddUserToGroup API.
+ #[arg(long = "concurrency", value_name = "N", default_value_t = 4)]
+ pub concurrency: usize,
+
+ /// Global timeout for the operation, in seconds.
+ ///
+ /// When set, the entire add operation is bounded by this timeout.
+ #[arg(long = "timeout", value_name = "SECONDS")]
+ pub timeout: Option<u64>,
+}
+
/// Common arguments shared by group-based operations.
#[derive(clap::Args, Debug, Clone)]
pub struct CommonOperationArgs {
#[tokio::main]
async fn main() -> Result<()> {
+ // let users = read_sync_file_to_map(".mail_sync").await?;
+
+ // for (email, username) in &users {
+ // println!("{email} => {username}");
+ // }
let cli = Cli::parse();
init_tracing(cli.verbose);
run_sync(&common).await?;
}
- _ => unimplemented!(),
- // Commands::Add(args) => {
- // let common = CommonOperationArgs {
- // pool_id: args.pool_id,
- // groups: args.groups,
- // emails_file: Some(args.emails_file),
- // concurrency: args.concurrency,
- // timeout: args.timeout,
- // };
-
- // run_add_groups(common).await?;
- // }
+ Commands::Add(args) => {
+ run_add_groups(args).await?;
+ }
// Commands::Del(args) => {
// let common = CommonOperationArgs {
// pool_id: args.pool_id,
// run_remove_groups(common).await?;
// }
+ _ => unimplemented!(),
}
Ok(())
"Starting users sync from Cognito user pool"
);
- let config = aws_config::load_from_env()
- .await;
- // .context("failed to load AWS configuration")?;
+ let config = aws_config::load_from_env().await;
+ // .context("failed to load AWS configuration")?;
let client = CognitoClient::new(&config);
let timeout = args.timeout.map(Duration::from_secs);
Ok(())
}
-async fn run_add_groups(args: CommonOperationArgs) -> Result<()> {
- info!(
- pool_id = %args.pool_id,
- emails_file = ?args.emails_file,
- concurrency = ?args.concurrency,
- timeout = ?args.timeout,
- "add groups operation requested (not implemented yet)"
- );
+async fn run_add_groups(args: AddArgs) -> Result<()> {
+ // info!(
+ // pool_id = %args.pool_id,
+ // emails_file = ?args.emails_file,
+ // concurrency = ?args.concurrency,
+ // timeout = ?args.timeout,
+ // "add groups operation requested (not implemented yet)"
+ // );
- if let Some(seconds) = args.timeout {
- let _timeout = Duration::from_secs(seconds);
- debug!(?seconds, "add operation timeout configured");
- }
+ let config = aws_config::load_from_env().await;
+ let client = CognitoClient::new(&config);
- // TODO: implement add-to-groups logic.
+ add_users_to_groups_from_files(
+ &client,
+ &args.pool_id,
+ &args.sync_file,
+ &args.emails_file,
+ &args.groups,
+ args.concurrency,
+ )
+ .await?;
+
Ok(())
}
///
/// If `args.emails_file` is set, the CSV is written to that file.
/// Otherwise, the CSV is written to stdout.
-pub(crate)async fn sync_users_to_csv(client: &CognitoClient, args: &CommonOperationArgs) -> Result<()> {
- let mut writer: Box<dyn AsyncWrite + Unpin + Send> = if let Some(path) = &args.emails_file {
- let file = File::create(path)
- .await
- .with_context(|| format!("failed to create output file at '{}'", path.display()))?;
- Box::new(file)
- } else {
- Box::new(io::stdout())
- };
+pub(crate) async fn sync_users_to_csv(
+ client: &CognitoClient,
+ args: &CommonOperationArgs,
+) -> Result<()> {
+ let mut writer: Box<dyn AsyncWrite + Unpin + Send> =
+ if let Some(path) = &args.emails_file {
+ let file = File::create(path).await.with_context(|| {
+ format!("failed to create output file at '{}'", path.display())
+ })?;
+ Box::new(file)
+ } else {
+ Box::new(io::stdout())
+ };
// CSV header
writer
total_users += 1;
}
- pagination_token = response
- .pagination_token()
- .map(|token| token.to_owned());
+ pagination_token =
+ response.pagination_token().map(|token| token.to_owned());
if pagination_token.is_none() {
break;
(username, email)
}
+/// Read a Cognito sync CSV file ("username,email") and return a
+/// HashMap<email, username>.
+///
+/// Expected CSV format:
+/// ```text
+/// username,email
+/// johndoe,john@example.com
+/// alice,alice@example.com
+/// ...
+/// ```
+///
+/// Notes:
+/// - The first line is treated as a header and skipped.
+/// - Lines missing either username or email are ignored.
+/// - Email is lowercased and trimmed to allow normalized lookups.
+pub async fn read_sync_file_to_map<P: AsRef<Path>>(
+ path: P,
+) -> Result<HashMap<String, String>> {
+ let file = File::open(&path).await.with_context(|| {
+ format!("failed to open sync file '{}'", path.as_ref().display())
+ })?;
+
+ let reader = BufReader::new(file);
+ let mut lines = reader.lines();
+
+ let mut map = HashMap::<String, String>::new();
+
+ // Skip header: "username,email"
+ if let Some(line) = lines.next_line().await? {
+ let _header = line.trim();
+ // We can ignore header validation for now.
+ }
+
+ while let Some(line) = lines.next_line().await? {
+ let trimmed = line.trim();
+ if trimmed.is_empty() {
+ continue;
+ }
+
+ let parts: Vec<&str> = trimmed.split(',').collect();
+ if parts.len() < 2 {
+ // malformed line, skip
+ continue;
+ }
+
+ let username = parts[0].trim();
+ let email = parts[1].trim().to_lowercase();
+
+ if !email.is_empty() && !username.is_empty() {
+ map.insert(email, username.to_string());
+ }
+ }
+
+ Ok(map)
+}
+
+/// Load a plain-text file containing one e-mail per line.
+///
+/// Each line is trimmed and lowercased. Empty lines are skipped.
+pub async fn load_email_list<P: AsRef<Path>>(path: P) -> Result<Vec<String>> {
+ let file = File::open(&path).await.with_context(|| {
+ format!(
+ "failed to open e-mail list file '{}'",
+ path.as_ref().display()
+ )
+ })?;
+
+ let reader = BufReader::new(file);
+ let mut lines = reader.lines();
+ let mut emails = Vec::new();
+
+ while let Some(line) = lines.next_line().await? {
+ let email = line.trim().to_lowercase();
+ if email.is_empty() {
+ continue;
+ }
+ emails.push(email);
+ }
+
+ Ok(emails)
+}
+
+/// Add a Cognito user to one or more groups using the Admin API.
+///
+/// This function assumes AWS credentials and permissions allow admin operations.
+pub async fn admin_add_user_to_groups(
+ client: &CognitoClient,
+ pool_id: &str,
+ username: &str,
+ groups: &[String],
+) -> Result<()> {
+ for group in groups {
+ info!(%username, %group, %pool_id, "adding user to Cognito group");
+
+ client
+ .admin_add_user_to_group()
+ .user_pool_id(pool_id)
+ .username(username)
+ .group_name(group)
+ .send()
+ .await
+ .with_context(|| {
+ format!(
+ "failed to add user '{}' to group '{}'",
+ username, group
+ )
+ })?;
+
+ info!(%username, %group, "user successfully added to group");
+ }
+
+ Ok(())
+}
+
+/// High-level flow:
+/// 1. Load sync CSV into HashMap<email, username>.
+/// 2. Load target e-mails (one per line).
+/// 3. For each email, find username in HashMap.
+/// 4. Call `admin_add_user_to_groups` with concurrency limit.
+/// 5. Log success or failure for each email.
+///
+/// `sync_csv_path`: CSV generated by `sync` (username,email).
+/// `emails_list_path`: TXT file with one e-mail per line (users to be added).
+pub async fn add_users_to_groups_from_files(
+ client: &CognitoClient,
+ pool_id: &str,
+ sync_csv_path: &Path,
+ emails_list_path: &Path,
+ groups: &[String],
+ concurrency: usize,
+) -> Result<()> {
+ let concurrency = std::cmp::max(1, concurrency);
+
+ // 1. Load sync CSV into map: email -> username
+ let email_to_username = read_sync_file_to_map(sync_csv_path).await?;
+ info!(
+ total_entries = email_to_username.len(),
+ "loaded sync map (email -> username)"
+ );
+
+ // 2. Load target e-mails
+ let emails = load_email_list(emails_list_path).await?;
+ info!(total_emails = emails.len(), "loaded target e-mail list");
+
+ let semaphore = Arc::new(Semaphore::new(concurrency));
+ let mut handles: Vec<JoinHandle<()>> = Vec::with_capacity(emails.len());
+
+ for email in emails {
+ let username = match email_to_username.get(&email) {
+ Some(u) => u.clone(),
+ None => {
+ error!(%email, "e-mail not found in sync map, skipping");
+ continue;
+ }
+ };
+
+ let permit = semaphore.clone().acquire_owned().await?;
+ let client_clone = client.clone();
+ let pool_id = pool_id.to_string();
+ let groups = groups.to_vec();
+
+ let handle = tokio::spawn(async move {
+ let _permit = permit; // keep permit alive for the duration of this task
+
+ if let Err(err) = admin_add_user_to_groups(
+ &client_clone,
+ &pool_id,
+ &username,
+ &groups,
+ )
+ .await
+ {
+ error!(
+ %email,
+ %username,
+ error = ?err,
+ "failed to add user to one or more groups"
+ );
+ } else {
+ info!(
+ %email,
+ %username,
+ groups = ?groups,
+ "user successfully processed for all groups"
+ );
+ }
+ });
+
+ handles.push(handle);
+ }
+
+ // Wait for all tasks to complete
+ for handle in handles {
+ // Ignore panics here, just surface as error logs.
+ if let Err(join_err) = handle.await {
+ error!(error = ?join_err, "join error while processing a user");
+ }
+ }
+
+ info!("finished processing all users for add-groups operation");
+ Ok(())
+}