C.A.T./importUsers.py

179 lines
7.9 KiB
Python
Executable File

# Simple Cloudron bulk user import script
# Uses the Cloudron REST API v1
# Rev 1.4, 10/19/22, changes:
# - added local storage of access token including a limited means to check for validity
# - if mailboxes were supposed to be added and that failed, the tool will dump the missing users
# into a csv file formatted to import into the Cloudron Email management section.
# Rev 1.3, 10/16/22, changes:
# - added option to add passwords as part of the import
# Please note that this method obviously discloses passwords to the admin, so
# users should be encouraged to change their password on first login.
# - updated help output to include the optional mailbox and password options
# - fixed the revision date of the last commit
# Rev 1.2, 03/19/21, changes:
# - removed the she-bang because it was interfering with alternate python paths. Use python importUsers.py to run
# - changed the CSV file format. The person_id field is gone, user_groups is in. This change allows flexibility
# on group membership (arbitrary number of groups)
# - got rid of the hardcoded values for group membership, use the new user_groups field
# - cleaned up the print statements to use formatted strings instead of awkward concatenations
# Rev 1.1, 03/07/21, changes:
# - added additional code and option -m to automatically create mailboxes on the appropriate domain.
# This should only be used if the mailaddress in the CSV-file belongs to the current cloudron (sub)domain
# CVS file requirements:
# - csv separator is ';' not ',' (which makes it a not-standard csv file, actually)
# - fieldnames [first_name; last_name; email_address; sis_username; user_groups]
# - first three are self explanatory.
# Of these, only email_address is required by cloudron. the other fields must exist but can be empty
# - sis_username is turned into the optional username in cloudron, field must exist but can be empty
# - user_groups is a comma-separated list of groups the user should be added to
# - example for a minimum CSV file:
# first_name;last_name;email_address;sis_username;user_groups
# Douglas;Adams;douglas.adams@example.com;d.adams;staff,writers
# ToDo:
# - groups from the user_group field must already exist. If a specified group is not available, an error is displayed.
# Future revisions could ask whether missing groups should be created.
# - could use some sanity checks for input (low priority, if you want to sabotage yourself, go ahead)
# - totpTokens are always requested, even if the user hasn't configured token 2FA (they probably should)
# - Check, if the supplied email address is in the same domain as the Cloudron if the -m option is set
import sys, getopt
import getpass
import requests
import csv
import os
import pickle
from json import loads
from datetime import datetime
from dateutil.relativedelta import relativedelta
def requestAccessToken(domain, username=''):
# check for valid access token
if os.path.exists(".cat.pkl"):
with open(".cat.pkl","rb") as cloudronAccessTokenFile:
accessTokenDict = pickle.load(cloudronAccessTokenFile)
terminationDate = datetime.now() + relativedelta(years=1)
if (accessTokenDict['domain'] == domain) and (accessTokenDict['date'] < terminationDate):
return accessTokenDict['token']
# current token is either invalid or for another domain, create a new one
apiBasePath = 'https://' + domain + "/api/v1"
if( username == '' ):
username = input("Enter username for " + domain + ": ")
password = getpass.getpass("Enter password for " + domain + ": ")
totpToken = input("Enter current totpToken: ")
a = requests.post(apiBasePath + '/cloudron/login', json={"username":username, "password":password, "totpToken":totpToken})
if( a.status_code == requests.codes.ok ):
accessToken = a.json()['accessToken']
accessTokenDict = {"domain":domain,"date":datetime.now(),"token":accessToken}
with open(".cat.pkl","wb+") as cloudronAccessTokenFile:
pickle.dump(accessTokenDict,cloudronAccessTokenFile)
return accessToken
else:
print(f"Error requesting access token: {a.status_code}")
sys.exit(2)
def main(argv):
domain = ''
username = ''
dataFilePath = './users.csv'
addmailbox = False
addpassword = False
try:
opts, args = getopt.getopt(argv,"hf:d:u:mp",["help","file=","domain=","username=","add-mailbox","add-password"])
except getopt.GetoptError:
print("importUsers.py -f <datafile> -d <domainname> -u <username> [-m] [-p]")
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print("importUsers.py -f <datafile> -d <domainname> -u <username> [-m] [-p]")
sys.exit()
elif opt in ('-f', '--file'):
dataFilePath = arg
directory,dataFileName = os.path.split(dataFilePath)
elif opt in ('-d', '--domain'):
domain = arg
elif opt in ('-u','--username'):
username = arg
elif opt in ('-m','--add-mailbox'):
addmailbox = True
elif opt in ('-p','--add-password'):
addpassword = True
if( domain == '' ):
print("domainname must be provided, use the -d flag")
sys.exit(2)
apiBasePath = 'https://' + domain + "/api/v1"
# get current access Token
accessToken = requestAccessToken(domain, username)
# read existing groups from the cloudron and build an abbreviated dictionary of name and id
r = requests.get(apiBasePath + '/groups?access_token='+accessToken)
groupData = loads(r.text)['groups']
groups = {}
for g in groupData:
groups[g['name']] = g['id']
# read users from csv file and add to cloudron
with open(dataFilePath) as users:
reader = csv.DictReader(users, delimiter=';')
for entry in reader:
# fill payload object with data from the csv
displayName = entry['first_name'] + ' ' + entry['last_name']
requestUrl = apiBasePath + '/users?access_token='+accessToken
password = ""
if( addpassword == True ):
password = entry['password']
payload = {"email":entry['email_address'], "username":entry['sis_username'], "displayName":displayName, "password":password}
r = requests.post(requestUrl, json=payload)
if( r.status_code == requests.codes.created ):
print(f"User {displayName} succesfully created")
curUserId = r.json()['id']
userUrl = apiBasePath + '/users/'+curUserId+'/groups?access_token='+accessToken
# look up current user's group id and prepare the groups array for adding the user to the groups
groupNames = entry['user_groups'].split(',')
userGroupsIDs = []
for curGroup in groupNames:
try:
userGroupsIDs.append(groups[curGroup])
except:
print(f"Group {curGroup} doesn't exist on this Cloudron.")
userGroups = {"groupIds":userGroupsIDs}
p = requests.put(userUrl,json=userGroups)
if( p.status_code == requests.codes.no_content):
print(f"User {displayName} added to groups {groupNames}")
else:
print(f"Could not add user {displayName} to groups, error code {p.status_code}")
# If a mailbox was to be added, do that now (we have the id already)
if( addmailbox == True ):
missingMailboxesFilename = "mailbox_" + dataFileName
payload = {"name":entry['sis_username'], "ownerId":curUserId, "ownerType":"user"}
userDomain = entry['email_address'].split('@')[-1]
mailUrl = apiBasePath + '/mail/'+userDomain+'/mailboxes?access_token='+accessToken
p = requests.post(mailUrl, json=payload)
if( p.status_code == requests.codes.created ):
print(f"Mailbox for user {displayName} created as {entry['email_address']}")
else:
print(f"Could not create mailbox for user {displayName}, error code {p.status_code}")
print(f"Dumping data to import file <mailbox_{dataFilePath}>")
with open(missingMailboxesFilename,"a+",newline="") as mailboxFile:
writer = csv.writer(mailboxFile,delimiter=",")
writer.writerow([entry['sis_username'],userDomain,curUserId,"user"])
else:
print(f"User {displayName} could not be created, statuscode {r.status_code}")
if __name__ == "__main__":
main(sys.argv[1:])