diff --git a/CHANGELOG.md b/CHANGELOG.md index e722463..6d5ec4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,19 @@ # VINCE Changelog +CHANGELOG +VINCE Coordination platform code + +Version 3.0.0 2024-04-10 + +* Made the Vendor Association button to track and populate ticket id & (if appropriate) vendor name. +* Upgraded `Django` 4.2 - Django 3 is end-of-life +* Restructured code for preparing vendors table data on VINCE Track case page so as to reduce load time +* Refactored certain queries for the VINCE Track reports page in support of the long term goal of reducing its load time + + Version 2.1.11 2024-03-14 -* Dependabot update recommendations: `cryptography` 41.0.6 to 42.0.4 and `django` from 3.2.23 to 3.2.24 +* Dependabot update recommendations: `cryptography` 41.0.6 to 42.0.4 and `django` from 3.2.23 to 3.2.24 * Added code to ensure comments entered into comment box will be preserved when user uploads a file * Fixed filters above vendor table in vendor tab of case page to ensure consistency with data in vendor table * Added logging to make it easier to track user deactivation & MFA resetting processes diff --git a/bakery/__init__.py b/bakery/__init__.py index 0b9de91..2b7a685 100644 --- a/bakery/__init__.py +++ b/bakery/__init__.py @@ -1,4 +1,8 @@ -default_app_config = 'bakery.apps.BakeryConfig' +import django + +if django.VERSION < (3, 2): + default_app_config = "bakery.apps.BakeryConfig" + DEFAULT_GZIP_CONTENT_TYPES = ( "application/atom+xml", "application/javascript", @@ -31,5 +35,5 @@ "text/vtt", "text/x-component", "text/x-cross-domain-policy", - "text/xml" + "text/xml", ) diff --git a/bakery/management/commands/build.py b/bakery/management/commands/build.py index a988bfc..4a3bc1a 100644 --- a/bakery/management/commands/build.py +++ b/bakery/management/commands/build.py @@ -15,7 +15,7 @@ # Filesystem from fs import path from fs import copy -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str as smart_text # Pooling import multiprocessing @@ -25,6 +25,7 @@ from django.apps import apps from django.conf import settings from django.core import management + try: from django.core.urlresolvers import get_callable except ImportError: @@ -34,57 +35,54 @@ # Logging import logging + logger = logging.getLogger(__name__) class Command(BaseCommand): - help = 'Bake out a site as flat files in the build directory' + help = "Bake out a site as flat files in the build directory" build_unconfig_msg = "Build directory unconfigured. Set BUILD_DIR in settings.py or provide it with --build-dir" views_unconfig_msg = "Bakery views unconfigured. Set BAKERY_VIEWS in settings.py or provide a list as arguments." # regex to match against for gzipping. CSS, JS, JSON, HTML, etc. - gzip_file_match = getattr( - settings, - 'GZIP_CONTENT_TYPES', - DEFAULT_GZIP_CONTENT_TYPES - ) + gzip_file_match = getattr(settings, "GZIP_CONTENT_TYPES", DEFAULT_GZIP_CONTENT_TYPES) def add_arguments(self, parser): - parser.add_argument('view_list', nargs='*', type=str, default=[]) + parser.add_argument("view_list", nargs="*", type=str, default=[]) parser.add_argument( "--build-dir", action="store", dest="build_dir", - default='', + default="", help="Specify the path of the build directory. \ -Will use settings.BUILD_DIR by default." +Will use settings.BUILD_DIR by default.", ) parser.add_argument( "--keep-build-dir", action="store_true", dest="keep_build_dir", default=False, - help="Skip initializing the build directory before building files." + help="Skip initializing the build directory before building files.", ) parser.add_argument( "--skip-static", action="store_true", dest="skip_static", default=False, - help="Skip collecting the static files when building." + help="Skip collecting the static files when building.", ) parser.add_argument( "--skip-media", action="store_true", dest="skip_media", default=False, - help="Skip collecting the media files when building." + help="Skip collecting the media files when building.", ) parser.add_argument( "--pooling", action="store_true", dest="pooling", default=False, - help=("Pool builds to run concurrently rather than running them one by one.") + help=("Pool builds to run concurrently rather than running them one by one."), ) def handle(self, *args, **options): @@ -118,14 +116,14 @@ def set_options(self, *args, **options): """ Configure a few global options before things get going. """ - self.verbosity = int(options.get('verbosity', 1)) + self.verbosity = int(options.get("verbosity", 1)) # Figure out what build directory to use if options.get("build_dir"): self.build_dir = options.get("build_dir") settings.BUILD_DIR = self.build_dir else: - if not hasattr(settings, 'BUILD_DIR'): + if not hasattr(settings, "BUILD_DIR"): raise CommandError(self.build_unconfig_msg) self.build_dir = settings.BUILD_DIR @@ -144,15 +142,15 @@ def set_options(self, *args, **options): self.fs.makedirs(self.build_dir) # Figure out what views we'll be using - if options.get('view_list'): - self.view_list = options['view_list'] + if options.get("view_list"): + self.view_list = options["view_list"] else: - if not hasattr(settings, 'BAKERY_VIEWS'): + if not hasattr(settings, "BAKERY_VIEWS"): raise CommandError(self.views_unconfig_msg) self.view_list = settings.BAKERY_VIEWS # Are we pooling? - self.pooling = options.get('pooling') + self.pooling = options.get("pooling") def init_build_dir(self): """ @@ -174,20 +172,13 @@ def build_static(self, *args, **options): logger.debug("Building static directory") if self.verbosity > 1: self.stdout.write("Building static directory") - management.call_command( - "collectstatic", - interactive=False, - verbosity=0 - ) + management.call_command("collectstatic", interactive=False, verbosity=0) # Set the target directory inside the filesystem. - target_dir = path.join( - self.build_dir, - settings.STATIC_URL.lstrip('/') - ) + target_dir = path.join(self.build_dir, settings.STATIC_URL.lstrip("/")) target_dir = smart_text(target_dir) if os.path.exists(self.static_root) and settings.STATIC_URL: - if getattr(settings, 'BAKERY_GZIP', False): + if getattr(settings, "BAKERY_GZIP", False): self.copytree_and_gzip(self.static_root, target_dir) # if gzip isn't enabled, just copy the tree straight over else: @@ -197,15 +188,15 @@ def build_static(self, *args, **options): # If they exist in the static directory, copy the robots.txt # and favicon.ico files down to the root so they will work # on the live website. - robots_src = path.join(target_dir, 'robots.txt') + robots_src = path.join(target_dir, "robots.txt") if self.fs.exists(robots_src): - robots_target = path.join(self.build_dir, 'robots.txt') + robots_target = path.join(self.build_dir, "robots.txt") logger.debug("Copying {}{} to {}{}".format(self.fs_name, robots_src, self.fs_name, robots_target)) self.fs.copy(robots_src, robots_target) - favicon_src = path.join(target_dir, 'favicon.ico') + favicon_src = path.join(target_dir, "favicon.ico") if self.fs.exists(favicon_src): - favicon_target = path.join(self.build_dir, 'favicon.ico') + favicon_target = path.join(self.build_dir, "favicon.ico") logger.debug("Copying {}{} to {}{}".format(self.fs_name, favicon_src, self.fs_name, favicon_target)) self.fs.copy(favicon_src, favicon_target) @@ -217,10 +208,9 @@ def build_media(self): if self.verbosity > 1: self.stdout.write("Building media directory") if os.path.exists(self.media_root) and settings.MEDIA_URL: - target_dir = path.join(self.build_dir, settings.MEDIA_URL.lstrip('/')) + target_dir = path.join(self.build_dir, settings.MEDIA_URL.lstrip("/")) logger.debug("Copying {}{} to {}{}".format("osfs://", self.media_root, self.fs_name, target_dir)) copy.copy_dir("osfs:///", smart_text(self.media_root), self.fs, smart_text(target_dir)) - def get_view_instance(self, view): """ @@ -249,7 +239,7 @@ def copytree_and_gzip(self, source_dir, target_dir): # Figure out what we're building... build_list = [] # Walk through the source directory... - for (dirpath, dirnames, filenames) in os.walk(source_dir): + for dirpath, dirnames, filenames in os.walk(source_dir): for f in filenames: # Figure out what is going where source_path = os.path.join(dirpath, f) @@ -261,7 +251,7 @@ def copytree_and_gzip(self, source_dir, target_dir): logger.debug("Gzipping {} files".format(len(build_list))) # Build em all - if not getattr(self, 'pooling', False): + if not getattr(self, "pooling", False): [self.copyfile_and_gzip(*u) for u in build_list] else: cpu_count = multiprocessing.cpu_count() @@ -299,48 +289,37 @@ def copyfile_and_gzip(self, source_path, target_path): # If it isn't a file want to gzip... if content_type not in self.gzip_file_match: # just copy it to the target. - logger.debug("Copying {}{} to {}{} because its filetype isn't on the whitelist".format( - "osfs://", - source_path, - self.fs_name, - target_path - )) + logger.debug( + "Copying {}{} to {}{} because its filetype isn't on the whitelist".format( + "osfs://", source_path, self.fs_name, target_path + ) + ) copy.copy_file("osfs:///", smart_text(source_path), self.fs, smart_text(target_path)) # # if the file is already gzipped - elif encoding == 'gzip': - logger.debug("Copying {}{} to {}{} because it's already gzipped".format( - "osfs://", - source_path, - self.fs_name, - target_path - )) + elif encoding == "gzip": + logger.debug( + "Copying {}{} to {}{} because it's already gzipped".format( + "osfs://", source_path, self.fs_name, target_path + ) + ) copy.copy_file("osfs:///", smart_text(source_path), self.fs, smart_text(target_path)) # If it is one we want to gzip... else: # ... let the world know ... - logger.debug("Gzipping {}{} to {}{}".format( - "osfs://", - source_path, - self.fs_name, - target_path - )) + logger.debug("Gzipping {}{} to {}{}".format("osfs://", source_path, self.fs_name, target_path)) # Open up the source file from the OS - with open(source_path, 'rb') as source_file: + with open(source_path, "rb") as source_file: # Write GZIP data to an in-memory buffer data_buffer = six.BytesIO() - kwargs = dict( - filename=path.basename(target_path), - mode='wb', - fileobj=data_buffer - ) + kwargs = dict(filename=path.basename(target_path), mode="wb", fileobj=data_buffer) if float(sys.version[:3]) >= 2.7: - kwargs['mtime'] = 0 + kwargs["mtime"] = 0 with gzip.GzipFile(**kwargs) as f: f.write(six.binary_type(source_file.read())) # Write that buffer out to the filesystem - with self.fs.open(smart_text(target_path), 'wb') as outfile: + with self.fs.open(smart_text(target_path), "wb") as outfile: outfile.write(data_buffer.getvalue()) outfile.close() diff --git a/bakery/management/commands/publish.py b/bakery/management/commands/publish.py index f32e08e..0d623c7 100644 --- a/bakery/management/commands/publish.py +++ b/bakery/management/commands/publish.py @@ -7,19 +7,15 @@ from django.conf import settings from multiprocessing.pool import ThreadPool from bakery import DEFAULT_GZIP_CONTENT_TYPES -from bakery.management.commands import ( - BasePublishCommand, - get_s3_client, - get_bucket_page -) +from bakery.management.commands import BasePublishCommand, get_s3_client, get_bucket_page -# Filesystem +# Filesystem import fs from fs import path from fs import copy from fs_s3fs import S3FS from fs.copy import copy_file -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str as smart_text from django.apps import apps @@ -32,16 +28,19 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) + class Command(BasePublishCommand): help = "Syncs the build directory with Amazon s3 bucket" # Default permissions for the files published to s3 - DEFAULT_ACL = 'public-read' + DEFAULT_ACL = "public-read" # Error messages we might use below build_missing_msg = "Build directory does not exist. Cannot publish something before you build it." build_unconfig_msg = "Build directory unconfigured. Set BUILD_DIR in settings.py or provide it with --build-dir" - bucket_unconfig_msg = "Bucket unconfigured. Set AWS_BUCKET_NAME in settings.py or provide it with --aws-bucket-name" + bucket_unconfig_msg = ( + "Bucket unconfigured. Set AWS_BUCKET_NAME in settings.py or provide it with --aws-bucket-name" + ) views_unconfig_msg = "Bakery views unconfigured. Set BAKERY_VIEWS in settings.py or provide a list as arguments." def add_arguments(self, parser): @@ -49,57 +48,57 @@ def add_arguments(self, parser): "--build-dir", action="store", dest="build_dir", - default='', - help="Specify the path of the build directory. Will use settings.BUILD_DIR by default." + default="", + help="Specify the path of the build directory. Will use settings.BUILD_DIR by default.", ) parser.add_argument( "--aws-bucket-name", action="store", dest="aws_bucket_name", - default='', - help="Specify the AWS bucket to sync with. Will use settings.AWS_BUCKET_NAME by default." + default="", + help="Specify the AWS bucket to sync with. Will use settings.AWS_BUCKET_NAME by default.", ) parser.add_argument( "--aws-bucket-prefix", action="store", dest="aws_bucket_prefix", - default='', - help="Specify a prefix for the AWS bucket keys to sync with. None by default." + default="", + help="Specify a prefix for the AWS bucket keys to sync with. None by default.", ) parser.add_argument( "--force", action="store_true", dest="force", default="", - help="Force a republish of all items in the build directory" + help="Force a republish of all items in the build directory", ) parser.add_argument( "--dry-run", action="store_true", dest="dry_run", default="", - help="Display the output of what would have been uploaded removed, but without actually publishing." + help="Display the output of what would have been uploaded removed, but without actually publishing.", ) parser.add_argument( "--no-delete", action="store_true", dest="no_delete", default=False, - help=("Keep files in S3, even if they do not exist in the build directory.") + help=("Keep files in S3, even if they do not exist in the build directory."), ) parser.add_argument( "--no-pooling", action="store_true", dest="no_pooling", default=False, - help=("Run uploads one by one rather than pooling them to run concurrently.") + help=("Run uploads one by one rather than pooling them to run concurrently."), ) parser.add_argument( "--s3fs", action="store_true", dest="s3fs", default=False, - help=("Use s3fs to do copy, which is required for certain filesystems (like MemoryFS)") + help=("Use s3fs to do copy, which is required for certain filesystems (like MemoryFS)"), ) def handle(self, *args, **options): @@ -162,25 +161,22 @@ def handle(self, *args, **options): logger.debug("Deleting %s keys" % self.deleted_files) if self.verbosity > 0: self.stdout.write("Deleting %s keys" % self.deleted_files) - self.batch_delete_s3_objects( - self.deleted_file_list, - self.aws_bucket_name - ) + self.batch_delete_s3_objects(self.deleted_file_list, self.aws_bucket_name) # Run any post publish hooks on the views - if not hasattr(settings, 'BAKERY_VIEWS'): + if not hasattr(settings, "BAKERY_VIEWS"): raise CommandError(self.views_unconfig_msg) for view_str in settings.BAKERY_VIEWS: view = get_callable(view_str)() - if hasattr(view, 'post_publish'): - getattr(view, 'post_publish')(self.bucket) + if hasattr(view, "post_publish"): + getattr(view, "post_publish")(self.bucket) # We're finished, print the final output elapsed_time = time.time() - self.start_time msg = "Publish completed, %d uploaded and %d deleted files in %.2f seconds" % ( self.uploaded_files, self.deleted_files, - elapsed_time + elapsed_time, ) logger.info(msg) if self.verbosity > 0: @@ -195,59 +191,55 @@ def set_options(self, options): """ Configure all the many options we'll need to make this happen. """ - self.verbosity = int(options.get('verbosity')) + self.verbosity = int(options.get("verbosity")) # Will we be gzipping? - self.gzip = getattr(settings, 'BAKERY_GZIP', False) + self.gzip = getattr(settings, "BAKERY_GZIP", False) # And if so what content types will we be gzipping? - self.gzip_content_types = getattr( - settings, - 'GZIP_CONTENT_TYPES', - DEFAULT_GZIP_CONTENT_TYPES - ) + self.gzip_content_types = getattr(settings, "GZIP_CONTENT_TYPES", DEFAULT_GZIP_CONTENT_TYPES) # What ACL (i.e. security permissions) will be giving the files on S3? - self.acl = getattr(settings, 'DEFAULT_ACL', self.DEFAULT_ACL) + self.acl = getattr(settings, "DEFAULT_ACL", self.DEFAULT_ACL) # Should we set cache-control headers? - self.cache_control = getattr(settings, 'BAKERY_CACHE_CONTROL', {}) + self.cache_control = getattr(settings, "BAKERY_CACHE_CONTROL", {}) # If the user specifies a build directory... - if options.get('build_dir'): + if options.get("build_dir"): # ... validate that it is good. - #if not os.path.exists(options.get('build_dir')): + # if not os.path.exists(options.get('build_dir')): # raise CommandError(self.build_missing_msg) # Go ahead and use it self.build_dir = options.get("build_dir") # If the user does not specify a build dir... else: # Check if it is set in settings.py - if not hasattr(settings, 'BUILD_DIR'): + if not hasattr(settings, "BUILD_DIR"): raise CommandError(self.build_unconfig_msg) # Then make sure it actually exists - #if not os.path.exists(settings.BUILD_DIR): + # if not os.path.exists(settings.BUILD_DIR): # raise CommandError(self.build_missing_msg) # Go ahead and use it self.build_dir = settings.BUILD_DIR self.build_dir = smart_text(self.build_dir) - # Connect the BUILD_DIR with our filesystem backend + # Connect the BUILD_DIR with our filesystem backend self.app = apps.get_app_config("bakery") self.fs = self.app.filesystem self.fs_name = self.app.filesystem_name - # If the build dir doesn't exist make it + # If the build dir doesn't exist make it if not self.fs.exists(self.build_dir): raise CommandError(self.build_missing_msg) - + # If the user provides a bucket name, use that. if options.get("aws_bucket_name"): self.aws_bucket_name = options.get("aws_bucket_name") else: # Otherwise try to find it the settings - if not hasattr(settings, 'AWS_BUCKET_NAME'): + if not hasattr(settings, "AWS_BUCKET_NAME"): raise CommandError(self.bucket_unconfig_msg) self.aws_bucket_name = settings.AWS_BUCKET_NAME @@ -255,22 +247,22 @@ def set_options(self, options): self.aws_bucket_prefix = options.get("aws_bucket_prefix") # If the user sets the --force option - if options.get('force'): + if options.get("force"): self.force_publish = True else: self.force_publish = False # set the --dry-run option - if options.get('dry_run'): + if options.get("dry_run"): self.dry_run = True if self.verbosity > 0: logger.info("Executing with the --dry-run option set.") else: self.dry_run = False - self.no_delete = options.get('no_delete') - self.no_pooling = options.get('no_pooling') - self.s3fs = options.get('s3fs') + self.no_delete = options.get("no_delete") + self.no_pooling = options.get("no_pooling") + self.s3fs = options.get("s3fs") def get_bucket_file_list(self): """ @@ -279,13 +271,11 @@ def get_bucket_file_list(self): """ logger.debug("Retrieving bucket object list") - paginator = self.s3_client.get_paginator('list_objects') - options = { - 'Bucket': self.aws_bucket_name - } + paginator = self.s3_client.get_paginator("list_objects") + options = {"Bucket": self.aws_bucket_name} if self.aws_bucket_prefix: logger.debug("Adding prefix {} to bucket list as a filter".format(self.aws_bucket_prefix)) - options['Prefix'] = self.aws_bucket_prefix + options["Prefix"] = self.aws_bucket_prefix page_iterator = paginator.paginate(**options) obj_dict = {} @@ -300,13 +290,10 @@ def get_local_file_list(self): absolute paths to files. """ file_list = [] - for (dirpath, dirnames, filenames) in self.fs.walk(self.build_dir): + for dirpath, dirnames, filenames in self.fs.walk(self.build_dir): for fname in filenames: - - local_key = path.combine( - path.frombase(path.abspath(self.build_dir), dirpath), - fname.name - ) + + local_key = path.combine(path.frombase(path.abspath(self.build_dir), dirpath), fname.name) local_key = path.relpath(local_key) file_list.append(smart_text(local_key)) return file_list @@ -320,10 +307,11 @@ def sync_with_s3(self): self.update_list = [] # Figure out which files need to be updated and upload all these files - logger.debug("Comparing {} local files with {} bucket files".format( - len(self.local_file_list), - len(self.s3_obj_dict.keys()) - )) + logger.debug( + "Comparing {} local files with {} bucket files".format( + len(self.local_file_list), len(self.s3_obj_dict.keys()) + ) + ) if self.no_pooling: [self.compare_local_file(f) for f in self.local_file_list] else: @@ -344,7 +332,7 @@ def get_md5(self, filename): """ Returns the md5 checksum of the provided file name. """ - with self.fs.open(filename, 'rb') as f: + with self.fs.open(filename, "rb") as f: m = hashlib.md5(f.read()) return m.hexdigest() @@ -356,7 +344,7 @@ def get_multipart_md5(self, filename, chunk_size=8 * 1024 * 1024): """ # Loop through the file contents ... md5s = [] - with self.fs.open(filename, 'rb') as fp: + with self.fs.open(filename, "rb") as fp: while True: # Break it into chunks data = fp.read(chunk_size) @@ -386,7 +374,7 @@ def compare_local_file(self, file_key): """ # Where is the file? file_path = path.combine(self.build_dir, file_key) - #file_path = file_key + # file_path = file_key # If we're in force_publish mode just add it if self.force_publish: self.update_list.append((file_key, file_path)) @@ -397,7 +385,7 @@ def compare_local_file(self, file_key): if file_key in self.s3_obj_dict: # Get the md5 stored in Amazon's header - s3_md5 = self.s3_obj_dict[file_key].get('ETag').strip('"').strip("'") + s3_md5 = self.s3_obj_dict[file_key].get("ETag").strip('"').strip("'") # If there is a multipart ETag on S3, compare that to our local file after its chunked up. # We are presuming this file was uploaded in multiple parts. @@ -437,25 +425,22 @@ def upload_to_s3(self, key, filename): Set the content type and gzip headers if applicable and upload the item to S3 """ - extra_args = {'ACL': self.acl} + extra_args = {"ACL": self.acl} # determine the mimetype of the file guess = mimetypes.guess_type(filename) content_type = guess[0] encoding = guess[1] if content_type: - extra_args['ContentType'] = content_type + extra_args["ContentType"] = content_type # add the gzip headers, if necessary - if (self.gzip and content_type in self.gzip_content_types) or encoding == 'gzip': - extra_args['ContentEncoding'] = 'gzip' + if (self.gzip and content_type in self.gzip_content_types) or encoding == "gzip": + extra_args["ContentEncoding"] = "gzip" # add the cache-control headers if necessary if content_type in self.cache_control: - extra_args['CacheControl'] = ''.join(( - 'max-age=', - str(self.cache_control[content_type]) - )) + extra_args["CacheControl"] = "".join(("max-age=", str(self.cache_control[content_type]))) # access and write the contents from the file if not self.dry_run: @@ -467,7 +452,7 @@ def upload_to_s3(self, key, filename): try: copy_file(self.fs, filename, s3fs, key) except fs.errors.ResourceNotFound as e: - #s3fs won't make directories if it doesn't exist, so have to do it explicitly + # s3fs won't make directories if it doesn't exist, so have to do it explicitly s3fs.makedirs(path.dirname(key)) copy_file(self.fs, filename, s3fs, key) else: diff --git a/bakery/static_urls.py b/bakery/static_urls.py index 0841393..f7ee711 100644 --- a/bakery/static_urls.py +++ b/bakery/static_urls.py @@ -1,12 +1,8 @@ from django.conf import settings -from django.conf.urls import url +from django.urls import include, re_path from bakery.static_views import serve urlpatterns = [ - url(r"^(.*)$", serve, { - "document_root": settings.BUILD_DIR, - 'show_indexes': True, - 'default': 'index.html' - }), + re_path(r"^(.*)$", serve, {"document_root": settings.BUILD_DIR, "show_indexes": True, "default": "index.html"}), ] diff --git a/bakery/static_views.py b/bakery/static_views.py index 8b8cf20..43a5be4 100644 --- a/bakery/static_views.py +++ b/bakery/static_views.py @@ -2,6 +2,7 @@ Views and functions for serving static files. These are only to be used during development, and SHOULD NOT be used in a production setting. """ + import django import mimetypes import os @@ -15,9 +16,10 @@ from django.template import Template, Context, TemplateDoesNotExist from django.utils.http import http_date, parse_http_date from django.conf import settings -from django.utils.http import is_same_domain, is_safe_url +from django.utils.http import is_same_domain, url_has_allowed_host_and_scheme as is_safe_url + -def serve(request, path, document_root=None, show_indexes=False, default=''): +def serve(request, path, document_root=None, show_indexes=False, default=""): """ Serve static files below a given point in the directory structure. @@ -40,9 +42,9 @@ def serve(request, path, document_root=None, show_indexes=False, default=''): # Clean up given path to only allow serving files below document_root. path = posixpath.normpath(unquote(path)) - path = path.lstrip('/') - newpath = '' - for part in path.split('/'): + path = path.lstrip("/") + newpath = "" + for part in path.split("/"): if not part: # Strip empty path components. continue @@ -51,9 +53,9 @@ def serve(request, path, document_root=None, show_indexes=False, default=''): if part in (os.curdir, os.pardir): # Strip '.' and '..' in path. continue - newpath = os.path.join(newpath, part).replace('\\', '/') + newpath = os.path.join(newpath, part).replace("\\", "/") if newpath and path != newpath: - if is_safe_url(newpath,set(settings.ALLOWED_HOSTS),True): + if is_safe_url(newpath, set(settings.ALLOWED_HOSTS), True): return HttpResponseRedirect(newpath) else: raise Http404("Invalid or Incorrect path found") @@ -70,14 +72,15 @@ def serve(request, path, document_root=None, show_indexes=False, default=''): raise Http404('"%s" does not exist' % fullpath) # Respect the If-Modified-Since header. statobj = os.stat(fullpath) - mimetype = mimetypes.guess_type(fullpath)[0] or 'application/octet-stream' - if not was_modified_since(request.META.get('HTTP_IF_MODIFIED_SINCE'), - statobj[stat.ST_MTIME], statobj[stat.ST_SIZE]): + mimetype = mimetypes.guess_type(fullpath)[0] or "application/octet-stream" + if not was_modified_since( + request.META.get("HTTP_IF_MODIFIED_SINCE"), statobj[stat.ST_MTIME], statobj[stat.ST_SIZE] + ): if django.VERSION > (1, 6): return HttpResponseNotModified(content_type=mimetype) else: return HttpResponseNotModified(mimetype=mimetype) - contents = open(fullpath, 'rb').read() + contents = open(fullpath, "rb").read() if django.VERSION > (1, 6): response = HttpResponse(contents, content_type=mimetype) else: @@ -114,25 +117,21 @@ def serve(request, path, document_root=None, show_indexes=False, default=''): def directory_index(path, fullpath): try: - t = loader.select_template([ - 'static/directory_index.html', - 'static/directory_index' - ]) + t = loader.select_template(["static/directory_index.html", "static/directory_index"]) except TemplateDoesNotExist: - t = Template( - DEFAULT_DIRECTORY_INDEX_TEMPLATE, - name='Default directory index template' - ) + t = Template(DEFAULT_DIRECTORY_INDEX_TEMPLATE, name="Default directory index template") files = [] for f in os.listdir(fullpath): - if not f.startswith('.'): + if not f.startswith("."): if os.path.isdir(os.path.join(fullpath, f)): - f += '/' + f += "/" files.append(f) - c = Context({ - 'directory': path + '/', - 'file_list': files, - }) + c = Context( + { + "directory": path + "/", + "file_list": files, + } + ) return HttpResponse(t.render(c)) @@ -150,8 +149,7 @@ def was_modified_since(header=None, mtime=0, size=0): try: if header is None: raise ValueError - matches = re.match(r"^([^;]+)(; length=([0-9]+))?$", header, - re.IGNORECASE) + matches = re.match(r"^([^;]+)(; length=([0-9]+))?$", header, re.IGNORECASE) header_mtime = parse_http_date(matches.group(1)) header_len = matches.group(3) if header_len and int(header_len) != size: diff --git a/bakery/views/base.py b/bakery/views/base.py index 3ac34a1..819fb29 100644 --- a/bakery/views/base.py +++ b/bakery/views/base.py @@ -14,11 +14,12 @@ from fs import path from django.apps import apps from django.conf import settings -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str as smart_text from bakery import DEFAULT_GZIP_CONTENT_TYPES from django.test.client import RequestFactory from bakery.management.commands import get_s3_client from django.views.generic import RedirectView, TemplateView + try: from django.core.urlresolvers import reverse, NoReverseMatch except ImportError: # Starting with Django 2.0, django.core.urlresolvers does not exist anymore @@ -30,6 +31,7 @@ class BuildableMixin(object): """ Common methods we will use in buildable views. """ + fs_name = apps.get_app_config("bakery").filesystem_name fs = apps.get_app_config("bakery").filesystem @@ -74,7 +76,7 @@ def write_file(self, target_path, html): Writes out the provided HTML to the provided path. """ logger.debug("Building to {}{}".format(self.fs_name, target_path)) - with self.fs.open(smart_text(target_path), 'wb') as outfile: + with self.fs.open(smart_text(target_path), "wb") as outfile: outfile.write(six.binary_type(html)) outfile.close() @@ -84,14 +86,10 @@ def is_gzippable(self, path): for gzipping. """ # First check if gzipping is allowed by the global setting - if not getattr(settings, 'BAKERY_GZIP', False): + if not getattr(settings, "BAKERY_GZIP", False): return False # Then check if the content type of this particular file is gzippable - whitelist = getattr( - settings, - 'GZIP_CONTENT_TYPES', - DEFAULT_GZIP_CONTENT_TYPES - ) + whitelist = getattr(settings, "GZIP_CONTENT_TYPES", DEFAULT_GZIP_CONTENT_TYPES) return mimetypes.guess_type(path)[0] in whitelist def gzip_file(self, target_path, html): @@ -109,18 +107,14 @@ def gzip_file(self, target_path, html): # Write GZIP data to an in-memory buffer data_buffer = six.BytesIO() - kwargs = dict( - filename=path.basename(target_path), - mode='wb', - fileobj=data_buffer - ) + kwargs = dict(filename=path.basename(target_path), mode="wb", fileobj=data_buffer) if float(sys.version[:3]) >= 2.7: - kwargs['mtime'] = 0 + kwargs["mtime"] = 0 with gzip.GzipFile(**kwargs) as f: f.write(six.binary_type(html)) # Write that buffer out to the filesystem - with self.fs.open(smart_text(target_path), 'wb') as outfile: + with self.fs.open(smart_text(target_path), "wb") as outfile: outfile.write(data_buffer.getvalue()) outfile.close() @@ -139,6 +133,7 @@ class BuildableTemplateView(TemplateView, BuildableMixin): template_name: The name of the template you would like Django to render. """ + @property def build_method(self): return self.build @@ -152,15 +147,16 @@ def build(self): self.build_file(path, self.get_content()) def get_build_path(self): - return six.text_type(self.build_path).lstrip('/') + return six.text_type(self.build_path).lstrip("/") class Buildable404View(BuildableTemplateView): """ The default Django 404 page, but built out. """ - build_path = '404.html' - template_name = '404.html' + + build_path = "404.html" + template_name = "404.html" class BuildableRedirectView(RedirectView, BuildableMixin): @@ -177,6 +173,7 @@ class BuildableRedirectView(RedirectView, BuildableMixin): The URL where redirect will send the user. Operates in the same way as the standard generic RedirectView. """ + permanent = True def get_content(self): @@ -196,10 +193,7 @@ def build_method(self): return self.build def build(self): - logger.debug("Building redirect from %s to %s" % ( - self.build_path, - self.get_redirect_url() - )) + logger.debug("Building redirect from %s to %s" % (self.build_path, self.get_redirect_url())) self.request = self.create_request(self.build_path) path = os.path.join(settings.BUILD_DIR, self.build_path) self.prep_directory(self.build_path) @@ -223,19 +217,16 @@ def get_redirect_url(self, *args, **kwargs): return url def post_publish(self, bucket): - logger.debug("Adding S3 redirect header from {} to in {} to {}".format( - self.build_path, - bucket.name, - self.get_redirect_url() - )) + logger.debug( + "Adding S3 redirect header from {} to in {} to {}".format( + self.build_path, bucket.name, self.get_redirect_url() + ) + ) s3_client, s3_resource = get_s3_client() s3_client.copy_object( - ACL='public-read', + ACL="public-read", Bucket=bucket.name, - CopySource={ - 'Bucket': bucket.name, - 'Key': self.build_path - }, + CopySource={"Bucket": bucket.name, "Key": self.build_path}, Key=self.build_path, - WebsiteRedirectLocation=self.get_redirect_url() + WebsiteRedirectLocation=self.get_redirect_url(), ) diff --git a/bigvince/settings_.py b/bigvince/settings_.py index a8eab58..f2b33e1 100644 --- a/bigvince/settings_.py +++ b/bigvince/settings_.py @@ -54,7 +54,7 @@ ROOT_DIR = environ.Path(__file__) - 3 # any change that requires database migrations is a minor release -VERSION = "2.1.11" +VERSION = "3.0.0" # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/2.1/howto/deployment/checklist/ diff --git a/cogauth/backend.py b/cogauth/backend.py index 4deafb2..7fd521b 100644 --- a/cogauth/backend.py +++ b/cogauth/backend.py @@ -33,6 +33,7 @@ from django.contrib.auth.backends import ModelBackend from django.contrib.auth import get_user_model from django.contrib.auth.hashers import make_password + try: from django.utils.six import iteritems except: @@ -48,51 +49,52 @@ from vinny.models import VinceAPIToken from rest_framework import exceptions from rest_framework.authentication import BaseAuthentication, TokenAuthentication, get_authorization_header -from django.utils.encoding import smart_text -from django.utils.translation import ugettext as _ +from django.utils.encoding import smart_str as smart_text +from django.utils.translation import gettext as _ from bigvince.utils import get_cognito_url, get_cognito_pool_url import traceback from lib.vince import utils as vinceutils + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) - class CognitoUser(Cognito): user_class = get_user_model() - COGNITO_ATTRS = getattr(settings, 'COGNITO_ATTR_MAPPING', - { 'username': 'username', - 'email':'email', - 'given_name' : 'first_name', - 'family_name':'last_name', - 'locale':'country' - } - ) + COGNITO_ATTRS = getattr( + settings, + "COGNITO_ATTR_MAPPING", + { + "username": "username", + "email": "email", + "given_name": "first_name", + "family_name": "last_name", + "locale": "country", + }, + ) def get_user_obj(self, username=None, attribute_list=[], metadata={}, attr_map={}): - user_attrs = cognito_to_dict(attribute_list,CognitoUser.COGNITO_ATTRS) + user_attrs = cognito_to_dict(attribute_list, CognitoUser.COGNITO_ATTRS) django_fields = [f.name for f in CognitoUser.user_class._meta.get_fields()] log_attrs = user_attrs.copy() - if 'api_key' in user_attrs: - log_attrs['api_key'] = "RESERVED" + if "api_key" in user_attrs: + log_attrs["api_key"] = "RESERVED" logger.debug(f"User attributes in Cognito is {log_attrs}") extra_attrs = {} # need to iterate over a copy for k, v in user_attrs.copy().items(): if k not in django_fields: - extra_attrs.update({k: user_attrs.pop(k, None) }) - if getattr(settings, 'COGNITO_CREATE_UNKNOWN_USERS', True): - user, created = CognitoUser.user_class.objects.update_or_create( - username=username, - defaults=user_attrs) + extra_attrs.update({k: user_attrs.pop(k, None)}) + if getattr(settings, "COGNITO_CREATE_UNKNOWN_USERS", True): + user, created = CognitoUser.user_class.objects.update_or_create(username=username, defaults=user_attrs) if user: if settings.VINCE_NAMESPACE == "vinny": try: for k, v in extra_attrs.items(): setattr(user.vinceprofile, k, v) - #logger.debug(f"{k}:{v}") + # logger.debug(f"{k}:{v}") user.vinceprofile.save() except Exception as e: logger.debug(f"Vinceprofile probably doesn't exist for user {username}, error returned {e}") @@ -120,13 +122,14 @@ def get_user_obj(self, username=None, attribute_list=[], metadata={}, attr_map={ except Exception as e: logger.debug(f"vinceprofile probably does not exist for {username}, returned error is {e}") try: - for k, v in extra_attrs.items(): + for k, v in extra_attrs.items(): setattr(user.usersettings, k, v) user.usersettings.save() except: logger.debug(f"usersettings probably doesn't exist for {username}") return user + class CognitoAuthenticate(ModelBackend): def authenticate(self, request, username=None, password=None): ip = vinceutils.get_ip(request) @@ -135,115 +138,122 @@ def authenticate(self, request, username=None, password=None): settings.COGNITO_USER_POOL_ID, settings.COGNITO_APP_ID, user_pool_region=settings.COGNITO_REGION, - access_key=getattr(settings, 'AWS_ACCESS_KEY_ID', None), - secret_key=getattr(settings, 'AWS_SECRET_ACCESS_KEY', None), - username=username) + access_key=getattr(settings, "AWS_ACCESS_KEY_ID", None), + secret_key=getattr(settings, "AWS_SECRET_ACCESS_KEY", None), + username=username, + ) try: logger.debug(f"trying to authenticate {username} from IP {ip}") cognito_user.authenticate(password) except ForceChangePasswordException: - request.session['FORCEPASSWORD']=True - request.session['username']=username + request.session["FORCEPASSWORD"] = True + request.session["username"] = username return None except SoftwareTokenException as e: - request.session['MFAREQUIRED']= "SOFTWARE_TOKEN_MFA" - request.session['username']=username - request.session['MFASession']=cognito_user.session - request.session['DEVICE_NAME'] = str(e) + request.session["MFAREQUIRED"] = "SOFTWARE_TOKEN_MFA" + request.session["username"] = username + request.session["MFASession"] = cognito_user.session + request.session["DEVICE_NAME"] = str(e) request.session.save() return None except SMSMFAException: - request.session['MFAREQUIRED']="SMS_MFA" - request.session['username']=username - request.session['MFASession']=cognito_user.session + request.session["MFAREQUIRED"] = "SMS_MFA" + request.session["username"] = username + request.session["MFASession"] = cognito_user.session request.session.save() return None except (Boto3Error, ClientError) as e: - error_code = e.response['Error']['Code'] + error_code = e.response["Error"]["Code"] logger.debug(f"error authenticating user {username} error: {e} {error_code} from IP {ip}") if error_code == "PasswordResetRequiredException": logger.debug(f"reset password needed for {username} from IP {ip}") - request.session['RESETPASSWORD']=True - request.session['username']=username + request.session["RESETPASSWORD"] = True + request.session["username"] = username return None if error_code == "UserNotConfirmedException": logger.debug(f"User {username} did not confirm their account from IP {ip}") - #get user + # get user user = User.objects.filter(username=username).first() if user: - request.session['NOTCONFIRMED'] = True - request.session['CONFIRM_ID'] = user.id + request.session["NOTCONFIRMED"] = True + request.session["CONFIRM_ID"] = user.id return None - if error_code in [ 'NotAuthorizedException', 'UserNotFoundException']: + if error_code in ["NotAuthorizedException", "UserNotFoundException"]: return None else: return None - elif request.session.get('ACCESS_TOKEN'): + elif request.session.get("ACCESS_TOKEN"): # no password means we are either getting the code and trading it in # for tokens or we already have tokens - in which case we just need to get # the user and return - client= boto3.client('cognito-idp', - endpoint_url=get_cognito_url(), region_name=settings.COGNITO_REGION) - user = client.get_user(AccessToken=request.session['ACCESS_TOKEN']) + client = boto3.client("cognito-idp", endpoint_url=get_cognito_url(), region_name=settings.COGNITO_REGION) + user = client.get_user(AccessToken=request.session["ACCESS_TOKEN"]) # the username returned is the unique id, which doesn't help us since we use # emails for username - so get email and return CognitoUser - email = list(filter(lambda email: email['Name'] == 'email', user['UserAttributes']))[0]['Value'] - username=email + email = list(filter(lambda email: email["Name"] == "email", user["UserAttributes"]))[0]["Value"] + username = email cognito_user = CognitoUser( settings.COGNITO_USER_POOL_ID, settings.COGNITO_APP_ID, user_pool_region=settings.COGNITO_REGION, - access_key=getattr(settings, 'AWS_ACCESS_KEY_ID', None), - secret_key=getattr(settings, 'AWS_SECRET_ACCESS_KEY', None), - username=username) - - cognito_user.access_token= request.session['ACCESS_TOKEN'] - cognito_user.refresh_token = request.session['REFRESH_TOKEN'] + access_key=getattr(settings, "AWS_ACCESS_KEY_ID", None), + secret_key=getattr(settings, "AWS_SECRET_ACCESS_KEY", None), + username=username, + ) + + cognito_user.access_token = request.session["ACCESS_TOKEN"] + cognito_user.refresh_token = request.session["REFRESH_TOKEN"] else: - headers={'Content-Type': 'application/x-www-form-urlencoded'} + headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { - 'grant_type': 'authorization_code', - 'client_id': settings.COGNITO_APP_ID, - 'redirect_uri':settings.COGNITO_REDIRECT_TO, - 'code': username + "grant_type": "authorization_code", + "client_id": settings.COGNITO_APP_ID, + "redirect_uri": settings.COGNITO_REDIRECT_TO, + "code": username, } - r = requests.post(settings.COGNITO_OAUTH_URL, headers=headers,data=data) - if not(r == None or (r.status_code != requests.codes.ok)): + r = requests.post(settings.COGNITO_OAUTH_URL, headers=headers, data=data) + if not (r == None or (r.status_code != requests.codes.ok)): rj = r.json() - access_token = rj['access_token'] - refresh_token = rj['refresh_token'] - id_token=rj['id_token'] + access_token = rj["access_token"] + refresh_token = rj["refresh_token"] + id_token = rj["id_token"] - u = Cognito(settings.COGNITO_USER_POOL_ID, settings.COGNITO_APP_ID, - user_pool_region=settings.COGNITO_REGION, - id_token=id_token, refresh_token=refresh_token, - access_token=access_token) + u = Cognito( + settings.COGNITO_USER_POOL_ID, + settings.COGNITO_APP_ID, + user_pool_region=settings.COGNITO_REGION, + id_token=id_token, + refresh_token=refresh_token, + access_token=access_token, + ) u.check_token() - - client= boto3.client('cognito-idp', - endpoint_url=get_cognito_url(), region_name=settings.COGNITO_REGION) + + client = boto3.client( + "cognito-idp", endpoint_url=get_cognito_url(), region_name=settings.COGNITO_REGION + ) user = client.get_user(AccessToken=access_token) - username = user['Username'] + username = user["Username"] cognito_user = CognitoUser( settings.COGNITO_USER_POOL_ID, settings.COGNITO_APP_ID, user_pool_region=settings.COGNITO_REGION, - access_key=getattr(settings, 'AWS_ACCESS_KEY_ID', None), - secret_key=getattr(settings, 'AWS_SECRET_ACCESS_KEY', None), - username=username) - - cognito_user.verify_token(id_token, 'id_token', 'id') - cognito_user.access_token= access_token + access_key=getattr(settings, "AWS_ACCESS_KEY_ID", None), + secret_key=getattr(settings, "AWS_SECRET_ACCESS_KEY", None), + username=username, + ) + + cognito_user.verify_token(id_token, "id_token", "id") + cognito_user.access_token = access_token cognito_user.refresh_token = refresh_token - cognito_user.token_type = rj['token_type'] - + cognito_user.token_type = rj["token_type"] + else: return None - + # now we have a cognito user - set session variables and return if cognito_user: user = cognito_user.get_user() @@ -253,10 +263,10 @@ def authenticate(self, request, username=None, password=None): return None if user: - request.session['ACCESS_TOKEN'] = cognito_user.access_token - request.session['ID_TOKEN'] = cognito_user.id_token - request.session['REFRESH_TOKEN'] = cognito_user.refresh_token - #request.session.save() + request.session["ACCESS_TOKEN"] = cognito_user.access_token + request.session["ID_TOKEN"] = cognito_user.id_token + request.session["REFRESH_TOKEN"] = cognito_user.refresh_token + # request.session.save() logger.info(f"USER {user} is authenticated from ip {ip}") return user @@ -267,10 +277,10 @@ class CognitoAuthenticateAPI(ModelBackend): def authenticate(self, request): """For rest_framework if successfully authenticated using CognitoAuth the response wil include a tuple (request.user,request.auth) - In case of Session based authentications + In case of Session based authentications request.user will be a Django User instance. request.auth will be None. - https://www.django-rest-framework.org/api-guide/authentication/ + https://www.django-rest-framework.org/api-guide/authentication/ """ try: ip = vinceutils.get_ip(request) @@ -280,12 +290,10 @@ def authenticate(self, request): return user, None else: logger.warn(f"Failed API authentication using session for User {user} from IP {ip}") - raise exceptions.AuthenticationFailed(_('Invalid API session attempted')) + raise exceptions.AuthenticationFailed(_("Invalid API session attempted")) except Exception as e: logger.warn(f"Failed API authentication for session error is {e}") - raise exceptions.AuthenticationFailed(_('Invalid API no session or token header was provided')) - - + raise exceptions.AuthenticationFailed(_("Invalid API no session or token header was provided")) class HashedTokenAuthentication(TokenAuthentication): @@ -295,12 +303,14 @@ class HashedTokenAuthentication(TokenAuthentication): HTTP header, prepended with the string "Token ". For example: Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a """ + model = VinceAPIToken def get_model(self): if self.model is not None: return self.model from rest_framework.authtoken.models import Token + return Token """ @@ -308,36 +318,36 @@ def get_model(self): * key -- The string identifying the token * user -- The user to which the token belongs """ + def authenticate(self, request): ip = vinceutils.get_ip(request) - setattr(self,"ip",ip) + setattr(self, "ip", ip) return super(HashedTokenAuthentication, self).authenticate(request) - def authenticate_credentials(self, key): - if hasattr(self,'ip'): + if hasattr(self, "ip"): ip = self.ip else: ip = "Unknown" model = self.get_model() hashed_key = make_password(key, settings.SECRET_KEY) try: - token = model.objects.select_related('user').get(key=hashed_key) + token = model.objects.select_related("user").get(key=hashed_key) except model.DoesNotExist: logger.warn(f"Failed API auth for token that does not exist {key} from IP {ip}") - raise exceptions.AuthenticationFailed(_('Invalid token.')) + raise exceptions.AuthenticationFailed(_("Invalid token.")) except Exception as e: logger.warn(f"Failed API auth for token Error {e} from IP {ip}") - raise exceptions.AuthenticationFailed(_('Unknown Token error.')) - + raise exceptions.AuthenticationFailed(_("Unknown Token error.")) + if not token.user.is_active: logger.warn(f"Failed API auth for {token.user} user is inactive or deleted from IP {ip}") - raise exceptions.AuthenticationFailed(_('User inactive or deleted.')) + raise exceptions.AuthenticationFailed(_("User inactive or deleted.")) logger.info(f"Success user {token.user} is authenticated using API Token from IP {ip}") return (token.user, token) - + class JSONWebTokenAuthentication(BaseAuthentication): """Token based authentication using the JSON Web Token standard.""" @@ -355,7 +365,7 @@ def authenticate(self, request): except TokenError: raise exceptions.AuthenticationFailed() logger.debug(f"JSONWeb returned payload is {jwt_payload}") - username=jwt_payload['email'] + username = jwt_payload["email"] user = User.objects.get(username=username) return (user, jwt_token) @@ -369,28 +379,21 @@ def get_jwt_token(self, request): msg = _("Invalid Authorization header. No credentials provided.") raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: - msg = _( - "Invalid Authorization header. Credentials string " - "should not contain spaces." - ) + msg = _("Invalid Authorization header. Credentials string " "should not contain spaces.") raise exceptions.AuthenticationFailed(msg) return auth[1] - def get_token_validator(self, request): return TokenValidator( settings.COGNITO_REGION, settings.COGNITO_USER_POOL_ID, settings.COGNITO_APP_ID, ) - + def authenticate_header(self, request): """ Method required by the DRF in order to return 401 responses for authentication failures, instead of 403. More details in https://www.django-rest-framework.org/api-guide/authentication/#custom-authentication. """ return "Bearer: api" - - - diff --git a/cogauth/views.py b/cogauth/views.py index ef5c900..6d43154 100644 --- a/cogauth/views.py +++ b/cogauth/views.py @@ -32,7 +32,7 @@ from django.forms.utils import ErrorList from django.http import Http404 from django.shortcuts import render, redirect, get_object_or_404 -from django.utils.translation import ugettext as _ +from django.utils.translation import gettext as _ from django.utils.decorators import method_decorator from django.core.exceptions import PermissionDenied @@ -75,7 +75,7 @@ import traceback from boto3.exceptions import Boto3Error from botocore.exceptions import ClientError, ParamValidationError -from django.utils.http import is_safe_url +from django.utils.http import url_has_allowed_host_and_scheme as is_safe_url from django.http.response import JsonResponse from bigvince.utils import get_cognito_url, get_cognito_pool_url from vinny.models import VinceCommEmail diff --git a/kbworker/urls.py b/kbworker/urls.py index d351dda..27388d4 100644 --- a/kbworker/urls.py +++ b/kbworker/urls.py @@ -41,10 +41,9 @@ 1. Import the include() function: from django.conf.urls import url, include 2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls')) """ -from django.conf.urls import url +from django.urls import include, re_path from kbworker.views import check_for_updates urlpatterns = [ - url(r'^check-for-updates/$', check_for_updates, name='checkupdate'), + re_path(r"^check-for-updates/$", check_for_updates, name="checkupdate"), ] - diff --git a/requirements.txt b/requirements.txt index 7885f80..cac846e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ amqp==5.1.1 appdirs==1.4.4 -asgiref==3.5.2 +asgiref==3.6.0 asn1crypto==1.5.1 async-timeout==4.0.2 attrs==22.1.0 awscli==1.27.11 +backports.zoneinfo==0.2.1 beautifulsoup4==4.11.1 billiard==4.0.2 bleach==5.0.1 @@ -20,10 +21,10 @@ charset-normalizer==2.1.1 click==8.1.3 colorama==0.4.4 cryptography==42.0.4 -cvelib==1.1.0 +cvelib==1.3.0 Deprecated==1.2.13 dictdiffer==0.9.0 -Django==3.2.24 +Django==4.2 django-appconf==1.0.5 django-countries==7.4.2 django-environ==0.9.0 @@ -50,7 +51,8 @@ Markdown==3.1 packaging==21.3 pinax-messages==3.0.0 pip-autoremove==0.10.0 -pkgutil_resolve_name==1.3.10 +pkgutil-resolve-name==1.3.10 +psycopg2==2.9.9 psycopg2-binary==2.9.5 pyasn1==0.4.8 pycparser==2.21 @@ -74,7 +76,7 @@ simplejson==3.18.0 six==1.16.0 soupsieve==2.3.2.post1 sqlparse==0.4.4 -typing_extensions==4.4.0 +typing-extensions==4.4.0 urllib3==1.26.18 vine==5.0.0 watchtower==3.0.0 diff --git a/vince/__init__.py b/vince/__init__.py index c56af29..2961763 100644 --- a/vince/__init__.py +++ b/vince/__init__.py @@ -27,9 +27,11 @@ # DM21-1126 ######################################################################## from __future__ import absolute_import, unicode_literals -#from .celery import app as celery_app +import django -#__all__ = ['celery_app'] -default_app_config = 'vince.apps.VinceTrackConfig' +# from .celery import app as celery_app +# __all__ = ['celery_app'] +if django.VERSION < (3, 2): + default_app_config = "vince.apps.VinceTrackConfig" diff --git a/vince/admin.py b/vince/admin.py index 4a88467..bdff13d 100644 --- a/vince/admin.py +++ b/vince/admin.py @@ -35,9 +35,42 @@ from django.contrib.auth import get_user_model from django.contrib.auth import views as auth_views from django.contrib.admin.views.decorators import staff_member_required -from django.utils.translation import ugettext_lazy as _ -from vince.models import TicketQueue, Ticket, FollowUp, CaseTemplate, UserSettings, Contact, QueuePermissions, CasePermissions, TicketThread, CaseAssignment, CaseAction, ContactAssociation, CaseParticipant, CalendarEvent, VulNote, BounceEmailNotification -from vince.models import TicketChange, Attachment, VulnerabilityCase, EmailTemplate, EmailContact, AdminPGPEmail, Artifact, Vulnerability, VendorStatus, VulnerableVendor, VinceSMIMECertificate, UserRole, VinceReminder, GroupSettings, TagManager +from django.utils.translation import gettext_lazy as _ +from vince.models import ( + TicketQueue, + Ticket, + FollowUp, + CaseTemplate, + UserSettings, + Contact, + QueuePermissions, + CasePermissions, + TicketThread, + CaseAssignment, + CaseAction, + ContactAssociation, + CaseParticipant, + CalendarEvent, + VulNote, + BounceEmailNotification, +) +from vince.models import ( + TicketChange, + Attachment, + VulnerabilityCase, + EmailTemplate, + EmailContact, + AdminPGPEmail, + Artifact, + Vulnerability, + VendorStatus, + VulnerableVendor, + VinceSMIMECertificate, + UserRole, + VinceReminder, + GroupSettings, + TagManager, +) from vinny.models import Thread, Message, VTCaseRequest, CaseMember, VendorAction from django.contrib.admin.helpers import ActionForm from cogauth.views import COGLoginView @@ -54,12 +87,15 @@ def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + + @admin.register(TicketQueue) class QueueAdmin(admin.ModelAdmin): - list_display = ('title', 'slug', 'default_owner') + list_display = ("title", "slug", "default_owner") prepopulated_fields = {"slug": ("title",)} - inlines = [QueuePermissionInline,] + inlines = [ + QueuePermissionInline, + ] def has_delete_permission(self, request, obj=None): return False @@ -71,9 +107,8 @@ def get_form(self, request, obj=None, **kwargs): if not is_superuser: disabled_fields |= { - 'group', - 'group_read' - 'group_write', + "group", + "group_read" "group_write", } for f in disabled_fields: @@ -83,48 +118,51 @@ def get_form(self, request, obj=None, **kwargs): return form - class CasePermissionInline(admin.TabularInline): model = CasePermissions + class CaseParticipantInline(admin.TabularInline): model = CaseParticipant + def bulk_reassign(modeladmin, request, queryset): ct = queryset.count() - if int(request.POST['user']) == 0: + if int(request.POST["user"]) == 0: title = "Bulk unassign ticket by {request.user.usersettings.vince_username}" else: - assignee = User.objects.get(id=request.POST['user']).usersettings.vince_username + assignee = User.objects.get(id=request.POST["user"]).usersettings.vince_username title = f"Bulk reassign ticket to user {assignee} by {request.user.usersettings.vince_username}" for x in queryset: ca = FollowUp(ticket=x, title=title, user=request.user) ca.save() - - if int(request.POST['user']) == 0: + + if int(request.POST["user"]) == 0: queryset.update(assigned_to=None) else: - queryset.update(assigned_to=request.POST['user']) + queryset.update(assigned_to=request.POST["user"]) messages.success(request, f"Successfully updated {ct} tickets") - -bulk_reassign.short_description = 'Reassign tickets to another user' + + +bulk_reassign.short_description = "Reassign tickets to another user" def bulk_tktstatuschange(modeladmin, request, queryset): ct = queryset.count() status_dict = dict(Ticket.STATUS_CHOICES) - + for x in queryset: title = f"Bulk ticket status change to {status_dict[int(request.POST['status'])]} by {request.user.usersettings.vince_username}" ca = FollowUp(ticket=x, title=title, user=request.user) ca.save() - - queryset.update(status = request.POST['status']) - + + queryset.update(status=request.POST["status"]) + messages.success(request, f"Successfully updated {ct} tickets") -bulk_tktstatuschange.short_description = 'Change ticket status' + +bulk_tktstatuschange.short_description = "Change ticket status" def bulk_casestatuschange(modeladmin, request, queryset): @@ -134,15 +172,17 @@ def bulk_casestatuschange(modeladmin, request, queryset): title = f"Bulk case status change to {status_dict[int(request.POST['status'])]} by {request.user.usersettings.vince_username}" ca = CaseAction(case=x, user=request.user, title=title, action_type=0) ca.save() - queryset.update(status = request.POST['status']) + queryset.update(status=request.POST["status"]) messages.success(request, f"Successfully updated {ct} cases") -bulk_casestatuschange.short_description = 'Change case status' + +bulk_casestatuschange.short_description = "Change case status" + def bulk_moveticket(modeladmin, request, queryset): ct = queryset.count() - if request.POST.get('case') != "": - case = VulnerabilityCase.objects.filter(vuid=request.POST['case']).first() + if request.POST.get("case") != "": + case = VulnerabilityCase.objects.filter(vuid=request.POST["case"]).first() queue = TicketQueue.objects.filter(slug="case").first() if case and queue: queryset.update(case=case, queue=queue) @@ -151,34 +191,39 @@ def bulk_moveticket(modeladmin, request, queryset): messages.error(request, f"Case doesn't exist") else: messages.error(request, f"Case doesn't exist") + + bulk_moveticket.short_description = "Move Tickets to Case Queue" + def bulk_reassign_cases(modeladmin, request, queryset): ct = queryset.count() - if int(request.POST['user']) == 0: + if int(request.POST["user"]) == 0: assignee = "None" else: - assignee = User.objects.get(id=request.POST['user']).usersettings.vince_username + assignee = User.objects.get(id=request.POST["user"]).usersettings.vince_username for x in queryset: title = f"Bulk reassign owner to user {assignee} by {request.user.usersettings.vince_username}" ca = CaseAction(case=x, user=request.user, title=title, action_type=0) ca.save() - if int(request.POST['user']) == 0: + if int(request.POST["user"]) == 0: queryset.update(owner=None) else: - queryset.update(owner=request.POST['user']) + queryset.update(owner=request.POST["user"]) messages.success(request, f"Successfully updated {ct} cases") -bulk_reassign_cases.short_description = 'Change case ownership to another user' + +bulk_reassign_cases.short_description = "Change case ownership to another user" + def bulk_add_user_case(modeladmin, request, queryset): ct = queryset.count() - if int(request.POST['user']) > 0: + if int(request.POST["user"]) > 0: for x in queryset: - assignee = User.objects.get(id=request.POST['user']) + assignee = User.objects.get(id=request.POST["user"]) CaseAssignment.objects.get_or_create(assigned=assignee, case=x) title = f"Bulk assigned user {assignee.usersettings.vince_username} by {request.user.usersettings.vince_username}" ca = CaseAction(case=x, user=request.user, title=title, action_type=0) @@ -187,13 +232,15 @@ def bulk_add_user_case(modeladmin, request, queryset): else: messages.error(request, f"Use bulk unassigment action to unassign user from case") + bulk_add_user_case.short_description = "Add user to case assignment" + def bulk_unassign_user_case(modeladmin, request, queryset): ct = queryset.count() - if int(request.POST['user']) > 0: + if int(request.POST["user"]) > 0: for x in queryset: - assignee = User.objects.get(id=request.POST['user']) + assignee = User.objects.get(id=request.POST["user"]) CaseAssignment.objects.filter(assigned=assignee, case=x).delete() title = f"Bulk unassigned user {assignee.usersettings.vince_username} by {request.user.usersettings.vince_username}" ca = CaseAction(case=x, user=request.user, title=title, action_type=0) @@ -201,68 +248,75 @@ def bulk_unassign_user_case(modeladmin, request, queryset): messages.success(request, f"Successfully updated {ct} cases") else: messages.error(request, f"Please select a user to unassign from selected cases") + + bulk_unassign_user_case.short_description = "Unassign user from all selected cases" + class BulkAssignmentForm(ActionForm): try: - USER_CHOICES = [(0, '--------')] + [(q.id, q.usersettings.preferred_username) for q in get_user_model().objects.all()] + USER_CHOICES = [(0, "--------")] + [ + (q.id, q.usersettings.preferred_username) for q in get_user_model().objects.all() + ] except: USER_CHOICES = [] - - user = forms.ChoiceField(choices=USER_CHOICES, - label=_('Assign a User'), - required=False - ) - status = forms.ChoiceField(choices=Ticket.STATUS_CHOICES, - label=_('Change Ticket Status'), - required=False) + user = forms.ChoiceField(choices=USER_CHOICES, label=_("Assign a User"), required=False) + + status = forms.ChoiceField(choices=Ticket.STATUS_CHOICES, label=_("Change Ticket Status"), required=False) + + case = forms.CharField(label=_("Case ID Number"), required=False) - case = forms.CharField(label=_('Case ID Number'), - required=False) - class CaseBulkAssignmentForm(ActionForm): try: - USER_CHOICES = [(0, '--------')] + [(q.id, q.usersettings.preferred_username) for q in get_user_model().objects.all()] + USER_CHOICES = [(0, "--------")] + [ + (q.id, q.usersettings.preferred_username) for q in get_user_model().objects.all() + ] except: USER_CHOICES = [] - user = forms.ChoiceField(choices=USER_CHOICES, - label=_('Assign a User'), - required=False - ) + user = forms.ChoiceField(choices=USER_CHOICES, label=_("Assign a User"), required=False) + + status = forms.ChoiceField(choices=VulnerabilityCase.STATUS_CHOICES, label=_("Change Case Status"), required=False) - status = forms.ChoiceField(choices=VulnerabilityCase.STATUS_CHOICES, - label=_('Change Case Status'), - required=False - ) class CaseAssignedFilter(admin.SimpleListFilter): title = "Assigned" - parameter_name = 'assigned_to' + parameter_name = "assigned_to" def lookups(self, request, model_admin): - return [(0, '--------')] + [(q.id, q.username) for q in get_user_model().objects.all()] + return [(0, "--------")] + [(q.id, q.username) for q in get_user_model().objects.all()] def queryset(self, request, queryset): if self.value() == 0: - assignments = CaseAssignment.objects.all().values_list('case__id', flat=True) + assignments = CaseAssignment.objects.all().values_list("case__id", flat=True) return queryset.exclude(id__in=assignments) elif self.value(): - assignments = CaseAssignment.objects.filter(assigned=self.value()).values_list('case__id', flat=True) + assignments = CaseAssignment.objects.filter(assigned=self.value()).values_list("case__id", flat=True) return queryset.filter(id__in=assignments) else: return queryset - @admin.register(VulnerabilityCase) class VinceCaseAdmin(admin.ModelAdmin): - list_display = ('vuid', 'title', 'team_owner', 'created', 'owner', 'status', 'product_name', 'case_get_assigned_to') - inlines = [CasePermissionInline, CaseParticipantInline, ] - search_fields = ('vuid', 'title', 'product_name') - list_filter = ('team_owner', 'owner', 'status', CaseAssignedFilter) + list_display = ( + "vuid", + "title", + "team_owner", + "created", + "owner", + "status", + "product_name", + "case_get_assigned_to", + ) + inlines = [ + CasePermissionInline, + CaseParticipantInline, + ] + search_fields = ("vuid", "title", "product_name") + list_filter = ("team_owner", "owner", "status", CaseAssignedFilter) action_form = CaseBulkAssignmentForm actions = [bulk_reassign_cases, bulk_add_user_case, bulk_unassign_user_case, bulk_casestatuschange] @@ -270,38 +324,45 @@ def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + def case_get_assigned_to(self, obj): return obj.get_assigned_to - case_get_assigned_to.short_description = _('Users assigned') - + case_get_assigned_to.short_description = _("Users assigned") + + @admin.register(EmailTemplate) -class EmailTemplateAdmin (admin.ModelAdmin): - list_display = ('template_name', 'subject', 'heading', 'plain_text', 'locale' ) - search_fields = ('template_name', 'locale', 'heading') - list_filter = ('locale', ) - - def has_delete_permission(self, request, obj=None): +class EmailTemplateAdmin(admin.ModelAdmin): + list_display = ("template_name", "subject", "heading", "plain_text", "locale") + search_fields = ("template_name", "locale", "heading") + list_filter = ("locale",) + + def has_delete_permission(self, request, obj=None): return False class AssignedFilter(admin.SimpleListFilter): title = "Filter by Assigned" - parameter_name = 'assigned_to' - + parameter_name = "assigned_to" + def lookups(self, request, model_admin): - return [(0, '--------')] + [(q.id, q.username) for q in get_user_model().objects.all()] + return [(0, "--------")] + [(q.id, q.username) for q in get_user_model().objects.all()] + def queryset(self, request, queryset): return queryset.filter(assigned_to=self.value()) - - + + @admin.register(Ticket) class TicketAdmin(admin.ModelAdmin): - search_fields=['title', 'case__vuid'] - list_display = ('title', 'status', 'assigned_to', 'queue', ) - date_hierarchy = 'created' - list_filter = ('queue', 'assigned_to', 'status') + search_fields = ["title", "case__vuid"] + list_display = ( + "title", + "status", + "assigned_to", + "queue", + ) + date_hierarchy = "created" + list_filter = ("queue", "assigned_to", "status") action_form = BulkAssignmentForm list_per_page = 250 actions = [bulk_reassign, bulk_tktstatuschange, bulk_moveticket] @@ -310,7 +371,7 @@ def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + def hidden_submitter_email(self, ticket): if ticket.submitter_email: username, domain = ticket.submitter_email.split("@") @@ -320,54 +381,57 @@ def hidden_submitter_email(self, ticket): else: return ticket.submitter_email - + class TicketChangeInline(admin.StackedInline): model = TicketChange + class AttachmentInline(admin.StackedInline): model = Attachment class ReminderAdmin(admin.ModelAdmin): - list_display = ('title', 'user', 'created_by', 'alert_date') - list_filter = ('user', 'alert_date', 'created_by') + list_display = ("title", "user", "created_by", "alert_date") + list_filter = ("user", "alert_date", "created_by") list_per_page = 250 def has_delete_permission(self, request, obj=None): if request.user.is_staff: return True return False - + + @admin.register(FollowUp) class FollowUpAdmin(admin.ModelAdmin): inlines = [TicketChangeInline, AttachmentInline] - list_display = ('ticket_get_ticket_for_url', 'title', 'date', 'ticket', 'user', 'new_status') - list_filter = ('user', 'date', 'new_status') + list_display = ("ticket_get_ticket_for_url", "title", "date", "ticket", "user", "new_status") + list_filter = ("user", "date", "new_status") def ticket_get_ticket_for_url(self, obj): return obj.ticket.ticket_for_url - ticket_get_ticket_for_url.short_description = _('Slug') - + ticket_get_ticket_for_url.short_description = _("Slug") + + class UserSettingsInline(admin.StackedInline): - model=UserSettings - can_delete=False - verbose_name_plural='UserSettings' - fk_name='user' - fields=('org', 'preferred_username', 'case_template', 'contacts_read', 'contacts_write') + model = UserSettings + can_delete = False + verbose_name_plural = "UserSettings" + fk_name = "user" + fields = ("org", "preferred_username", "case_template", "contacts_read", "contacts_write") -class CustomUserAdmin(UserAdmin): - inlines=(UserSettingsInline,) - list_display = ('username', 'first_name', 'last_name', 'get_preferred_username') - list_select_related = ('usersettings',) - actions=['get_preferred_username'] +class CustomUserAdmin(UserAdmin): + inlines = (UserSettingsInline,) + list_display = ("username", "first_name", "last_name", "get_preferred_username") + list_select_related = ("usersettings",) + actions = ["get_preferred_username"] def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + def get_form(self, request, obj=None, **kwargs): form = super().get_form(request, obj, **kwargs) is_superuser = request.user.is_superuser @@ -375,81 +439,82 @@ def get_form(self, request, obj=None, **kwargs): if not is_superuser: disabled_fields |= { - 'username', - 'is_superuser', - 'email', - 'user_permissions', + "username", + "is_superuser", + "email", + "user_permissions", } - if (not is_superuser - and obj is not None - and obj == request.user - ): + if not is_superuser and obj is not None and obj == request.user: disabled_fields |= { - 'is_staff', - 'is_superuser', - 'groups', - 'user_permissions', + "is_staff", + "is_superuser", + "groups", + "user_permissions", } - + for f in disabled_fields: if f in form.base_fields: form.base_fields[f].disabled = True return form - + def get_preferred_username(self, instance): return instance.usersettings.preferred_username + get_preferred_username.short_description = "Visible" - + def get_inline_instances(self, request, obj=None): if not obj: return list() return super(CustomUserAdmin, self).get_inline_instances(request, obj) + class EmailContactInLine(admin.TabularInline): model = EmailContact - + class ContactAdmin(admin.ModelAdmin): - search_fields=['vendor_name'] - list_display=['vendor_name', 'vendor_type', 'active', "_emails"] + search_fields = ["vendor_name"] + list_display = ["vendor_name", "vendor_type", "active", "_emails"] + + inlines = [EmailContactInLine] - inlines = [ - EmailContactInLine - ] - def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + def _emails(self, obj): return obj.get_emails() - -#admin.site = CustomAdminSite("default") + + +# admin.site = CustomAdminSite("default") + class AdminPGPEmailAdmin(admin.ModelAdmin): - fields = ('pgp_key_data', 'pgp_key_id', 'email', 'name', 'active') - list_display = ('pgp_key_id', 'email', 'name', 'active') + fields = ("pgp_key_data", "pgp_key_id", "email", "name", "active") + list_display = ("pgp_key_id", "email", "name", "active") def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False + class VulAdmin(admin.ModelAdmin): - list_display = ('get_vul_id', 'cve', 'case', 'description') - search_fields=['description', 'case__vuid', 'cve'] - actions = ['get_vul_id'] + list_display = ("get_vul_id", "cve", "case", "description") + search_fields = ["description", "case__vuid", "cve"] + actions = ["get_vul_id"] title = "Deleted Vulnerabilities" - + def get_queryset(self, request): qs = super().get_queryset(request) return qs.filter(deleted=True) def get_vul_id(self, instance): return instance.vul + get_vul_id.short_description = "Vul ID" def has_delete_permission(self, request, obj=None): @@ -457,10 +522,11 @@ def has_delete_permission(self, request, obj=None): return True return False + class VulVendorAdmin(admin.ModelAdmin): - search_fields = ['case__vuid', 'case__title', 'vendor', 'contact__vendor_name'] - list_filter = ('deleted', ) - + search_fields = ["case__vuid", "case__title", "vendor", "contact__vendor_name"] + list_filter = ("deleted",) + def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True @@ -469,25 +535,30 @@ def has_delete_permission(self, request, obj=None): class MessageInline(admin.TabularInline): model = Message - fields = ('content', 'created') + fields = ("content", "created") def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + + class ThreadAdmin(admin.ModelAdmin): - list_display = ('id', 'subject', 'to_group', 'from_group', 'case') - inlines = [MessageInline,] - + list_display = ("id", "subject", "to_group", "from_group", "case") + inlines = [ + MessageInline, + ] + def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False + class CaseMemberAdmin(admin.ModelAdmin): - search_fields = ['case__vuid', 'case__title', 'group__groupcontact__contact__vendor_name', 'participant__email'] - + search_fields = ["case__vuid", "case__title", "group__groupcontact__contact__vendor_name", "participant__email"] + + class VUReportInline(admin.TabularInline): model = VUReport @@ -496,91 +567,122 @@ def has_delete_permission(self, request, obj=None): return True return False + class TrackVulNoteAdmin(admin.ModelAdmin): - search_fields = ['case__vuid', 'case__title'] - list_display = ['case'] + search_fields = ["case__vuid", "case__title"] + list_display = ["case"] def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + + class VulNoteAdmin(admin.ModelAdmin): - search_fields = ['vuid', 'title'] - list_display = ['vuid', 'title'] - fields = ['vuid', 'title', 'dateupdated', 'datefirstpublished', 'revision_number', 'publicdate', 'published'] - readonly_fields = ['vuid', 'title', 'dateupdated', 'datefirstpublished', 'revision_number'] + search_fields = ["vuid", "title"] + list_display = ["vuid", "title"] + fields = ["vuid", "title", "dateupdated", "datefirstpublished", "revision_number", "publicdate", "published"] + readonly_fields = ["vuid", "title", "dateupdated", "datefirstpublished", "revision_number"] def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False + class VUReportAdmin(admin.ModelAdmin): - search_fields = ['vuid', 'name', 'idnumber'] - list_display = ('vuid', 'name',) - readonly_fields = ['vuid', 'idnumber', 'name', 'overview', 'vulnote', 'search_vector', 'clean_desc', 'impact', 'resolution', 'workarounds', 'sysaffected', 'thanks', 'author', 'public'] - + search_fields = ["vuid", "name", "idnumber"] + list_display = ( + "vuid", + "name", + ) + readonly_fields = [ + "vuid", + "idnumber", + "name", + "overview", + "vulnote", + "search_vector", + "clean_desc", + "impact", + "resolution", + "workarounds", + "sysaffected", + "thanks", + "author", + "public", + ] + def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False + class VulPubAdmin(admin.ModelAdmin): def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False - + class VulPubVendorAdmin(admin.ModelAdmin): - search_fields = ['vendor'] - + search_fields = ["vendor"] + def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False + class VulPubVendorRecord(admin.ModelAdmin): - search_fields = ['vendor', 'vuid', 'idnumber'] + search_fields = ["vendor", "vuid", "idnumber"] def has_delete_permission(self, request, obj=None): if request.user.is_superuser: return True return False + class VTCaseRequestAdmin(admin.ModelAdmin): - list_display = ['vrf_id', 'product_name', 'vendor_name', 'user', 'new_vuid','date_submitted', 'coordinator'] - search_fields = ['vrf_id', 'product_name', 'new_vuid', 'vendor_name'] + list_display = ["vrf_id", "product_name", "vendor_name", "user", "new_vuid", "date_submitted", "coordinator"] + search_fields = ["vrf_id", "product_name", "new_vuid", "vendor_name"] class TagManagerAdmin(admin.ModelAdmin): - list_display = ['tag', 'description', 'tag_type', 'team'] - search_fields = ['tag', 'description'] - + list_display = ["tag", "description", "tag_type", "team"] + search_fields = ["tag", "description"] + + class GroupInline(admin.StackedInline): model = GroupSettings can_delete = False - verbose_name_plural = 'Group Settings' - + verbose_name_plural = "Group Settings" + + class GroupAdmin(BaseGroupAdmin): - inlines = (GroupInline, ) - list_display = ('name', 'get_org_name') + inlines = (GroupInline,) + list_display = ("name", "get_org_name") def get_org_name(self, instance): if instance.groupsettings: return instance.groupsettings.organization return "-" + get_org_name.short_description = "Organization Name" + class BounceAdmin(admin.ModelAdmin): - list_display = ['email', 'ticket', 'bounce_date', 'bounce_type', 'action_taken'] - search_fields = ['email', 'subject'] - -admin.site.login = staff_member_required(COGLoginView.as_view(template_name='vince/admin_login.html'), login_url = settings.LOGIN_URL) -admin.site.logout = auth_views.LogoutView.as_view(template_name='vince/tracklogout.html') + list_display = ["email", "ticket", "bounce_date", "bounce_type", "action_taken"] + search_fields = ["email", "subject"] + + +admin.site.login = staff_member_required( + COGLoginView.as_view(template_name="vince/admin_login.html"), login_url=settings.LOGIN_URL +) +admin.site.logout = auth_views.LogoutView.as_view(template_name="vince/tracklogout.html") admin.site.site_header = "VinceTrack Admin" admin.site.site_title = "VinceTrack Admin Portal" @@ -595,9 +697,9 @@ class BounceAdmin(admin.ModelAdmin): admin.site.register(Contact, ContactAdmin) admin.site.register(Vulnerability, VulAdmin) admin.site.register(VinceSMIMECertificate) -#admin.site.register(TicketThread) +# admin.site.register(TicketThread) admin.site.register(AdminPGPEmail, AdminPGPEmailAdmin) -#admin.site.register(Artifact) +# admin.site.register(Artifact) admin.site.register(VulnerableVendor, VulVendorAdmin) admin.site.register(Thread, ThreadAdmin) admin.site.register(VulnerabilityNote, VulNoteAdmin) diff --git a/vince/forms.py b/vince/forms.py index 09928ce..ebf1dda 100644 --- a/vince/forms.py +++ b/vince/forms.py @@ -55,7 +55,7 @@ from vince.permissions import get_user_gen_queue import traceback import os -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str as smart_text logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -1979,20 +1979,6 @@ def clean_ticket(self): except: raise forms.ValidationError("Invalid Ticket Selection. Use only numeric ID of Ticket.") - # def clean_email(self): - # email = self.cleaned_data["email"] - # logger.debug(f"email is {email}") - # internal = self.cleaned_data["internal"] - # logger.debug(f"internal is {internal}") - # if email in [None, "", "None"] and internal: - # logger.debug("we have reached the if block in which email is none and internal is truey") - # return - # try: - # logger.debug("we have passed the if block in which email is none and internal is truey") - # return email - # except: - # raise forms.ValidationError("Unacceptable email value.") - class ContactForm(forms.ModelForm): vtype = forms.ChoiceField( diff --git a/vince/lib.py b/vince/lib.py index 0167aea..9a4f2e8 100644 --- a/vince/lib.py +++ b/vince/lib.py @@ -46,7 +46,7 @@ from django.core.files import File from django.core.serializers.json import DjangoJSONEncoder from django.utils import timezone -from django.utils.encoding import smart_text +from django.utils.encoding import smart_str as smart_text from django.template.loader import render_to_string, get_template from vince.models import VulnerabilityCase diff --git a/vince/static/vince/css/style.css b/vince/static/vince/css/style.css index 5ba2061..9c4d7ca 100644 --- a/vince/static/vince/css/style.css +++ b/vince/static/vince/css/style.css @@ -1425,12 +1425,13 @@ div.homelink a { width:100%; } +/* when putting up an announcement-banner, change padding-top for the following two selectors to 195 and 225 respectively: */ #TRoffCanvasLeft { background-color: #282829; border-right: 1px solid #c2c2c2; color: #f1f1f2; - padding-top:195px; + padding-top:130px; } @@ -1438,7 +1439,7 @@ div.homelink a { background-color: #f1f1f2; border-right: 1px solid #c2c2c2; color: #4d4d4f; - padding-top:225px; + padding-top:150px; } /*.position-left.reveal-for-medium ~ .off-canvas-content { diff --git a/vince/static/vince/js/case.js b/vince/static/vince/js/case.js index 8950743..98466da 100644 --- a/vince/static/vince/js/case.js +++ b/vince/static/vince/js/case.js @@ -1264,7 +1264,7 @@ $(document).ready(function() { function contactClickFunction(cell, formatterParams, onRendered) { var val = cell.getValue(); - if (cell.getRow().getData().users == 0) { + if (cell.getRow().getData().users == false) { val = " " + val } if (cell.getRow().getData().alert_tags.length) { @@ -1280,7 +1280,7 @@ $(document).ready(function() { return "This Vendor is tagged with an ALERT Tag: " + cell.getRow().getData().alert_tags[0] } - if (cell.getRow().getData().users == 0) { + if (cell.getRow().getData().users == false) { return "This vendor does not have any VINCE Users"; } else { return "This vendor has VINCE Users"; @@ -1391,7 +1391,7 @@ $(document).ready(function() { } function customNouserfilter(data, filterParams) { - return (data.users == 0); + return (data.users == false); } $(document).on("click", ".reqapproval", function(event) { @@ -1405,7 +1405,9 @@ $(document).ready(function() { $(document).on("click", ".vendorswithnousers", function(event) { event.preventDefault(); - vendors_table.setFilter("users", "=", "0"); + vendors_table.setFilter(function(data){ + return !data.users; + }); }); $(document).on("click", ".vendorapproved", function(event) { @@ -1454,7 +1456,7 @@ $(document).ready(function() { total_notified_vendors++ } - if (data[i].users == 0) { + if (data[i].users == false) { total_vendors_no_users++ } if (data[i].seen) { @@ -1492,6 +1494,7 @@ $(document).ready(function() { async function createVendorsTable() { let data = await ajaxVendorData() let vendors_data = data['data'] + console.log(vendors_data) populateFiltersWithValues(vendors_data) let vendors_total = vendors_data.length let pageSizeOptionsArray = [] diff --git a/vince/static/vince/js/contactverify.js b/vince/static/vince/js/contactverify.js index 121beeb..789dbd5 100644 --- a/vince/static/vince/js/contactverify.js +++ b/vince/static/vince/js/contactverify.js @@ -57,7 +57,13 @@ function getEmails(e, taggle) { for (let i=0; i< emails.length; i++) { taggle.add(emails[i]); } - } + }, + error: function(){ + console.log("ajax was erroneous") + }, + complete: function(){ + console.log("ajax was completed") + } }); } @@ -104,6 +110,8 @@ $(document).ready(function() { }); $.getJSON("/vince/ajax_calls/search/", function(data) { + console.log('the data entered into getJSON is'); + console.log(data); contact_auto(data); }); @@ -180,12 +188,13 @@ $(document).ready(function() { // } // } // } - // }); + // }); - // let user_to_verify_field = document.getElementById('id_user'); + // let user_to_verify_field = document.getElementById('id_user'); - // let temporarily_allowed_email = "" - // user_to_verify_field.addEventListener('change', function() { + // let temporarily_allowed_email = "" + // user_to_verify_field.addEventListener('change', function() { + // // remove whatever email was previously allowed as a result of this event listener: // let currently_allowed_emails = taggle.settings.allowedTags // for (let i=0; i < currently_allowed_emails.length; i++){ // if (currently_allowed_emails[i] == temporarily_allowed_email){ @@ -193,13 +202,14 @@ $(document).ready(function() { // taggle.remove(temporarily_allowed_email) // } // } + // // allow the new email and add it to the list of taggles: // temporarily_allowed_email = user_to_verify_field.value // currently_allowed_emails.push(temporarily_allowed_email) // if (internal_verification_checkbox.checked){ // taggle.settings.allowedTags = currently_allowed_emails; // taggle.add(temporarily_allowed_email); // } - // }); + // }); }); diff --git a/vince/static/vince/js/scontact.js b/vince/static/vince/js/scontact.js index ecee4ad..c79a11c 100644 --- a/vince/static/vince/js/scontact.js +++ b/vince/static/vince/js/scontact.js @@ -430,31 +430,31 @@ $(document).ready(function() { }); $(document).on("submit", "#addemailform", function(event) { - event.preventDefault(); - var url = $("#addemailform").attr("action"); - $.ajax({ + event.preventDefault(); + var url = $("#addemailform").attr("action"); + $.ajax({ url: url, type: "POST", - data: $("#addemailform").serialize(), + data: $("#addemailform").serialize(), success: function(data) { - console.log(data); - if (data['ticket']) { - location.href = data['ticket']; - } - else if (data['refresh']) { - window.location.reload(true); - } else if (data['msg_body']){ - $("#vendor-results").html("
" + data['text'] + " Or Request Authorization via Email
") + console.log(data); + if (data['ticket']) { + location.href = data['ticket']; + } + else if (data['refresh']) { + window.location.reload(true); + } else if (data['msg_body']){ + $("#vendor-results").html("" + data['text'] + " Or Request Authorization via Email
") $("#id_msg").val(data['msg_body']); $("#msgvendor").removeClass("hidden"); - } else { - $("#vendor-results").html("" + data['text'] +"
"); - if (data['bypass']) { - $("#vendor-results").append("Request Internal Validation for this Email
"); - } - } + } else { + $("#vendor-results").html("" + data['text'] +"
"); + if (data['bypass']) { + $("#vendor-results").append("Request Internal Validation for this Email
"); + } + } } - }); + }); }); }); diff --git a/vince/templates/vince/base.html b/vince/templates/vince/base.html index 6559584..7afb823 100644 --- a/vince/templates/vince/base.html +++ b/vince/templates/vince/base.html @@ -175,7 +175,7 @@{{ followup.title|escape|email_to_user }} {% if followup.title in "Comment,Closed" and followup.user == user %} {% elif "Email" in followup.title %}{% if followup.email_id %}{% endif %}{% endif %}
-- {% if followup.comment|is_json %} -
{{ followup.title|escape|email_to_user }} {% if followup.title in "Comment,Closed" and followup.user == user %} {% elif "Email" in followup.title %}{% if followup.email_id %}{% endif %}{% endif %}
++ {% if followup.comment|is_json %} +