Skip to content

Report

hypha.apply.projects.models.report

ReportQueryset

Bases: QuerySet

done

done()
Source code in hypha/apply/projects/models/report.py
def done(self):
    return self.filter(
        Q(current__isnull=False) | Q(skipped=True),
    )

to_do

to_do()
Source code in hypha/apply/projects/models/report.py
def to_do(self):
    today = timezone.now().date()
    return self.filter(
        current__isnull=True,
        skipped=False,
        end_date__lt=today,
    ).order_by("end_date")

any_very_late

any_very_late()
Source code in hypha/apply/projects/models/report.py
def any_very_late(self):
    two_weeks_ago = timezone.now().date() - relativedelta(weeks=2)
    return self.to_do().filter(end_date__lte=two_weeks_ago)

submitted

submitted()
Source code in hypha/apply/projects/models/report.py
def submitted(self):
    return self.filter(current__isnull=False)

for_table

for_table()
Source code in hypha/apply/projects/models/report.py
def for_table(self):
    Project = apps.get_model("application_projects", "Project")
    return self.annotate(
        last_end_date=Subquery(
            Report.objects.filter(
                project=OuterRef("project_id"), end_date__lt=OuterRef("end_date")
            ).values("end_date")[:1]
        ),
        project_start_date=Subquery(
            Project.objects.filter(
                pk=OuterRef("project_id"),
            )
            .with_start_date()
            .values("start")[:1]
        ),
        start=Case(
            When(
                last_end_date__isnull=False,
                # Expression Wrapper doesn't cast the calculated object
                # Use cast to get an actual date object
                then=Cast(
                    ExpressionWrapper(
                        F("last_end_date") + datetime.timedelta(days=1),
                        output_field=models.DateTimeField(),
                    ),
                    models.DateField(),
                ),
            ),
            default=F("project_start_date"),
            output_field=models.DateField(),
        ),
    )

Report

Bases: Model

skipped class-attribute instance-attribute

skipped = BooleanField(default=False)

end_date class-attribute instance-attribute

end_date = DateField()

project class-attribute instance-attribute

project = ForeignKey('Project', on_delete=CASCADE, related_name='reports')

submitted class-attribute instance-attribute

submitted = DateTimeField(null=True)

notified class-attribute instance-attribute

notified = DateTimeField(null=True)

current class-attribute instance-attribute

current = OneToOneField('ReportVersion', on_delete=CASCADE, related_name='live_for_report', null=True)

draft class-attribute instance-attribute

draft = OneToOneField('ReportVersion', on_delete=CASCADE, related_name='draft_for_report', null=True)

objects class-attribute instance-attribute

objects = as_manager()

wagtail_reference_index_ignore class-attribute instance-attribute

wagtail_reference_index_ignore = True

previous property

previous

next property

next

past_due property

past_due

is_very_late property

is_very_late

can_submit property

can_submit

submitted_date property

submitted_date

Meta

ordering class-attribute instance-attribute
ordering = ('-end_date')

get_absolute_url

get_absolute_url()
Source code in hypha/apply/projects/models/report.py
def get_absolute_url(self):
    return reverse("apply:projects:reports:detail", kwargs={"pk": self.pk})

start_date

start_date()
Source code in hypha/apply/projects/models/report.py
@cached_property
def start_date(self):
    last_report = self.project.reports.filter(end_date__lt=self.end_date).first()
    if last_report:
        return last_report.end_date + relativedelta(days=1)

    return self.project.start_date

ReportVersion

Bases: BaseStreamForm, AccessFormData, Model

stream_file_class class-attribute instance-attribute

stream_file_class = PrivateStreamFieldFile

storage_class class-attribute instance-attribute

storage_class = PrivateStorage

raw_data property

raw_data

question_field_ids property

question_field_ids

file_field_ids property

file_field_ids

question_text_field_ids property

question_text_field_ids

first_group_question_text_field_ids property

first_group_question_text_field_ids

raw_fields property

raw_fields

fields property

fields

named_blocks property

named_blocks

normal_blocks property

normal_blocks

group_toggle_blocks property

group_toggle_blocks

first_group_normal_text_blocks property

first_group_normal_text_blocks

submission_form_class class-attribute instance-attribute

submission_form_class = PageStreamBaseForm

report class-attribute instance-attribute

report = ForeignKey('Report', on_delete=CASCADE, related_name='versions')

submitted class-attribute instance-attribute

submitted = DateTimeField()

form_fields class-attribute instance-attribute

form_fields = StreamField(ProjectFormCustomFormFieldsBlock(), use_json_field=True)

form_data class-attribute instance-attribute

form_data = JSONField(encoder=StreamFieldDataEncoder, default=dict)

draft class-attribute instance-attribute

draft = BooleanField()

author class-attribute instance-attribute

author = ForeignKey(AUTH_USER_MODEL, on_delete=SET_NULL, related_name='reports', null=True)

wagtail_reference_index_ignore class-attribute instance-attribute

wagtail_reference_index_ignore = True

stream_file classmethod

stream_file(instance, field, file)
Source code in hypha/apply/funds/models/mixins.py
@classmethod
def stream_file(cls, instance, field, file):
    if not file:
        return []
    if isinstance(file, cls.stream_file_class):
        return file
    if isinstance(file, File):
        return cls.stream_file_class(
            instance, field, file, name=file.name, storage=cls.storage_class()
        )

    if isinstance(file, PlaceholderUploadedFile):
        return cls.stream_file_class(
            instance,
            field,
            None,
            name=file.file_id,
            filename=file.name,
            storage=cls.storage_class(),
        )

    # This fixes a backwards compatibility issue with #507
    # Once every application has been re-saved it should be possible to remove it
    if "path" in file:
        file["filename"] = file["name"]
        file["name"] = file["path"]
    return cls.stream_file_class(
        instance,
        field,
        None,
        name=file["name"],
        filename=file.get("filename"),
        storage=cls.storage_class(),
    )

process_file classmethod

process_file(instance, field, file)
Source code in hypha/apply/funds/models/mixins.py
@classmethod
def process_file(cls, instance, field, file):
    if isinstance(file, list):
        return [cls.stream_file(instance, field, f) for f in file]
    else:
        return cls.stream_file(instance, field, file)

process_file_data

process_file_data(data)
Source code in hypha/apply/funds/models/mixins.py
def process_file_data(self, data):
    for field in self.form_fields:
        if isinstance(field.block, UploadableMediaBlock):
            file = self.process_file(self, field, data.get(field.id, []))
            try:
                file.save()
            except (AttributeError, FileNotFoundError):
                try:
                    for f in file:
                        f.save()
                except FileNotFoundError:
                    pass
            self.form_data[field.id] = file

extract_files

extract_files()
Source code in hypha/apply/funds/models/mixins.py
def extract_files(self):
    files = {}
    for field in self.form_fields:
        if isinstance(field.block, UploadableMediaBlock):
            files[field.id] = self.data(field.id) or []
            self.form_data.pop(field.id, None)
    return files

from_db classmethod

from_db(db, field_names, values)
Source code in hypha/apply/stream_forms/models.py
@classmethod
def from_db(cls, db, field_names, values):
    instance = super().from_db(db, field_names, values)
    if "form_data" in field_names:
        instance.form_data = cls.deserialize_form_data(
            instance, instance.form_data, instance.form_fields
        )
    return instance

deserialised_data classmethod

deserialised_data(instance, data, form_fields)
Source code in hypha/apply/funds/models/mixins.py
@classmethod
def deserialised_data(cls, instance, data, form_fields):
    # Converts the file dicts into actual file objects
    data = data.copy()
    # PERFORMANCE NOTE:
    # Do not attempt to iterate over form_fields - that will fully instantiate the form_fields
    # including any sub queries that they do
    for i, field_data in enumerate(form_fields.raw_data):
        block = form_fields.stream_block.child_blocks[field_data["type"]]
        if isinstance(block, UploadableMediaBlock):
            field_id = field_data.get("id")
            if field_id:
                field = form_fields[i]
                file = data.get(field_id, [])
                data[field_id] = cls.process_file(instance, field, file)
    return data

get_definitive_id

get_definitive_id(id)
Source code in hypha/apply/funds/models/mixins.py
def get_definitive_id(self, id):
    if id in self.named_blocks:
        return self.named_blocks[id]
    return id

field

field(id)
Source code in hypha/apply/funds/models/mixins.py
def field(self, id):
    definitive_id = self.get_definitive_id(id)
    try:
        return self.raw_fields[definitive_id]
    except KeyError:
        raise UnusedFieldException(id) from None

data

data(id)
Source code in hypha/apply/funds/models/mixins.py
def data(self, id):
    definitive_id = self.get_definitive_id(id)
    try:
        return self.raw_data[definitive_id]
    except KeyError:
        # We have most likely progressed application forms so the data isn't in form_data
        return None

get_serialize_multi_inputs_answer

get_serialize_multi_inputs_answer(field)
Source code in hypha/apply/funds/models/mixins.py
def get_serialize_multi_inputs_answer(self, field):
    number_of_inputs = field.value.get("number_of_inputs")
    answers = [self.data(field.id + "_" + str(i)) for i in range(number_of_inputs)]
    data = ", ".join(filter(None, answers))
    return data

serialize

serialize(field_id)
Source code in hypha/apply/funds/models/mixins.py
def serialize(self, field_id):
    field = self.field(field_id)
    if isinstance(field.block, MultiInputCharFieldBlock):
        data = self.get_serialize_multi_inputs_answer(field)
    else:
        data = self.data(field_id)
    return field.render(
        context={
            "serialize": True,
            "data": data,
        }
    )

get_multi_inputs_answer

get_multi_inputs_answer(field, include_question=False)
Source code in hypha/apply/funds/models/mixins.py
def get_multi_inputs_answer(self, field, include_question=False):
    number_of_inputs = field.value.get("number_of_inputs")
    answers = [self.data(field.id + "_" + str(i)) for i in range(number_of_inputs)]

    render_data = [
        field.render(
            context={
                "data": answer,
                "include_question": include_question if i == 0 else False,
            }
        )
        for i, answer in enumerate(filter(None, answers))
    ]
    return "".join(render_data).replace("</section>", "") + "</section>"

render_answer

render_answer(field_id, include_question=False)
Source code in hypha/apply/funds/models/mixins.py
def render_answer(self, field_id, include_question=False):
    try:
        field = self.field(field_id)
    except UnusedFieldException:
        return "-"
    if isinstance(field.block, MultiInputCharFieldBlock):
        render_data = self.get_multi_inputs_answer(field, include_question)
        return render_data
    else:
        data = self.data(field_id)
    # Some migrated content have empty address.
    if not data:
        return field.render(
            context={"data": "", "include_question": include_question}
        )
    return field.render(
        context={"data": data, "include_question": include_question}
    )

render_answers

render_answers()
Source code in hypha/apply/funds/models/mixins.py
def render_answers(self):
    # Returns a list of the rendered answers
    return [
        self.render_answer(field_id, include_question=True)
        for field_id in self.normal_blocks
    ]

render_first_group_text_answers

render_first_group_text_answers()
Source code in hypha/apply/funds/models/mixins.py
def render_first_group_text_answers(self):
    return [
        self.render_answer(field_id, include_question=True)
        for field_id in self.first_group_normal_text_blocks
    ]

render_text_blocks_answers

render_text_blocks_answers()
Source code in hypha/apply/funds/models/mixins.py
def render_text_blocks_answers(self):
    # Returns a list of the rendered answers of type text
    return [
        self.render_answer(field_id, include_question=True)
        for field_id in self.question_text_field_ids
        if field_id not in self.named_blocks
    ]

output_answers

output_answers()
Source code in hypha/apply/funds/models/mixins.py
def output_answers(self):
    # Returns a safe string of the rendered answers
    return mark_safe("".join(self.render_answers()))

output_text_answers

output_text_answers()
Source code in hypha/apply/funds/models/mixins.py
def output_text_answers(self):
    return mark_safe("".join(self.render_text_blocks_answers()))

output_first_group_text_answers

output_first_group_text_answers()
Source code in hypha/apply/funds/models/mixins.py
def output_first_group_text_answers(self):
    return mark_safe("".join(self.render_first_group_text_answers()))

get_answer_from_label

get_answer_from_label(label)
Source code in hypha/apply/funds/models/mixins.py
def get_answer_from_label(self, label):
    for field_id in self.question_text_field_ids:
        if field_id not in self.named_blocks:
            question_field = self.serialize(field_id)
            if label.lower() in question_field["question"].lower():
                if isinstance(question_field["answer"], str):
                    answer = question_field["answer"]
                else:
                    answer = ",".join(question_field["answer"])
                if answer and not answer == "N":
                    return answer
    return None

deserialize_form_data classmethod

deserialize_form_data(instance, form_data, form_fields)
Source code in hypha/apply/stream_forms/models.py
@classmethod
def deserialize_form_data(cls, instance, form_data, form_fields):
    data = form_data.copy()
    # PERFORMANCE NOTE:
    # Do not attempt to iterate over form_fields - that will fully instantiate the form_fields
    # including any sub queries that they do
    for _i, field_data in enumerate(form_fields.raw_data):
        block = form_fields.stream_block.child_blocks[field_data["type"]]
        field_id = field_data.get("id")
        try:
            value = data[field_id]
        except KeyError:
            pass
        else:
            data[field_id] = block.decode(value)
    return data

get_defined_fields

get_defined_fields()
Source code in hypha/apply/stream_forms/models.py
def get_defined_fields(self):
    return self.form_fields

get_form_fields

get_form_fields(draft=False, form_data=None, user=None)
Source code in hypha/apply/stream_forms/models.py
def get_form_fields(self, draft=False, form_data=None, user=None):
    if form_data is None:
        form_data = {}

    form_fields = OrderedDict()
    field_blocks = self.get_defined_fields()
    group_counter = 1
    is_in_group = False

    # If true option 1 is selected
    grouped_fields_visible = False
    for struct_child in field_blocks:
        block = struct_child.block
        struct_value = struct_child.value
        if isinstance(block, FormFieldBlock):
            field_from_block = block.get_field(struct_value)
            disabled_help_text = _(
                "You are logged in so this information is fetched from your user account."
            )
            if isinstance(block, FullNameBlock) and user and user.is_authenticated:
                if user.full_name:
                    field_from_block.disabled = True
                    field_from_block.initial = user.full_name
                    field_from_block.help_text = disabled_help_text
                else:
                    field_from_block.help_text = _(
                        "You are logged in but your user account does not have a "
                        "full name. We'll update your user account with the name you provide here."
                    )
            if isinstance(block, EmailBlock) and user and user.is_authenticated:
                field_from_block.disabled = True
                field_from_block.initial = user.email
                field_from_block.help_text = disabled_help_text
            if draft and not issubclass(
                block.__class__, ApplicationMustIncludeFieldBlock
            ):
                field_from_block.required = False
            field_from_block.help_link = struct_value.get("help_link")
            field_from_block.group_number = group_counter if is_in_group else 1
            if isinstance(block, GroupToggleBlock) and not is_in_group:
                field_from_block.group_number = 1
                field_from_block.grouper_for = group_counter + 1
                group_counter += 1
                is_in_group = True
                grouped_fields_visible = (
                    form_data.get(struct_child.id) == field_from_block.choices[0][0]
                )
            if isinstance(block, TextFieldBlock):
                field_from_block.word_limit = struct_value.get("word_limit")
            if isinstance(block, MultiInputCharFieldBlock):
                number_of_inputs = struct_value.get("number_of_inputs")
                for index in range(number_of_inputs):
                    form_fields[struct_child.id + "_" + str(index)] = (
                        field_from_block
                    )
                    field_from_block.multi_input_id = struct_child.id
                    field_from_block.add_button_text = struct_value.get(
                        "add_button_text"
                    )
                    if (
                        index == number_of_inputs - 1
                    ):  # Add button after last input field
                        field_from_block.multi_input_add_button = True
                        # Index for field until which fields will be visible to applicant.
                        # Initially only the first field with id UUID_0 will be visible.
                        field_from_block.visibility_index = 0
                        field_from_block.max_index = index
                    if index != 0:
                        field_from_block.multi_input_field = True
                        field_from_block.required = False
                        field_from_block.initial = None
                    field_from_block = copy.copy(field_from_block)
            else:
                if is_in_group and not isinstance(block, GroupToggleBlock):
                    field_from_block.required_when_visible = (
                        field_from_block.required
                    )
                    field_from_block.required = (
                        field_from_block.required & grouped_fields_visible
                    )
                    field_from_block.visible = grouped_fields_visible
                form_fields[struct_child.id] = field_from_block
        elif isinstance(block, GroupToggleEndBlock):
            # Group toggle end block is used only to group fields and not used in actual form.
            # Todo: Use streamblock to create nested form field blocks, a more elegant method to group form fields.
            is_in_group = False
        else:
            field_wrapper = BlockFieldWrapper(struct_child)
            field_wrapper.group_number = group_counter if is_in_group else 1
            form_fields[struct_child.id] = field_wrapper

    return form_fields

get_form_class

get_form_class(draft=False, form_data=None, user=None)
Source code in hypha/apply/stream_forms/models.py
def get_form_class(self, draft=False, form_data=None, user=None):
    return type(
        "WagtailStreamForm",
        (self.submission_form_class,),
        self.get_form_fields(draft, form_data, user),
    )

ReportPrivateFiles

Bases: Model

report class-attribute instance-attribute

report = ForeignKey('ReportVersion', on_delete=CASCADE, related_name='files')

document class-attribute instance-attribute

document = FileField(upload_to=report_path, storage=PrivateStorage())

wagtail_reference_index_ignore class-attribute instance-attribute

wagtail_reference_index_ignore = True

filename property

filename

get_absolute_url

get_absolute_url()
Source code in hypha/apply/projects/models/report.py
def get_absolute_url(self):
    return reverse(
        "apply:projects:reports:document",
        kwargs={"pk": self.report.report_id, "file_pk": self.pk},
    )

ReportConfig

Bases: Model

Persists configuration about the reporting schedule etc

WEEK class-attribute instance-attribute

WEEK = gettext_lazy('week')

MONTH class-attribute instance-attribute

MONTH = gettext_lazy('month')

YEAR class-attribute instance-attribute

YEAR = gettext_lazy('year')

FREQUENCY_CHOICES class-attribute instance-attribute

FREQUENCY_CHOICES = [(WEEK, gettext_lazy('Weeks')), (MONTH, gettext_lazy('Months')), (YEAR, gettext_lazy('Years'))]

project class-attribute instance-attribute

project = OneToOneField('Project', on_delete=CASCADE, related_name='report_config')

schedule_start class-attribute instance-attribute

schedule_start = DateField(null=True)

occurrence class-attribute instance-attribute

occurrence = PositiveSmallIntegerField(default=1)

frequency class-attribute instance-attribute

frequency = CharField(choices=FREQUENCY_CHOICES, default=MONTH, max_length=6)

disable_reporting class-attribute instance-attribute

disable_reporting = BooleanField(default=False)

does_not_repeat class-attribute instance-attribute

does_not_repeat = BooleanField(default=False)

get_frequency_display

get_frequency_display()
Source code in hypha/apply/projects/models/report.py
def get_frequency_display(self):
    if self.disable_reporting:
        return _("Reporting Disabled")
    if self.does_not_repeat:
        last_report = self.last_report()
        if last_report:
            return _(
                "One time, that already has reported on {date}".format(
                    date=last_report.end_date.strftime("%d %B, %Y")
                )
            )
        return _(
            "One time on {date}".format(
                date=self.schedule_start.strftime("%d %B, %Y")
            )
        )
    next_report = self.current_due_report()

    if self.frequency == self.YEAR:
        if self.schedule_start and self.schedule_start.day == 31:
            day_of_month = _("last day")
            month = self.schedule_start.strftime("%B")
        else:
            day_of_month = ordinal(next_report.end_date.day)
            month = next_report.end_date.strftime("%B")
        if self.occurrence == 1:
            return _("Once a year on {month} {day}").format(
                day=day_of_month, month=month
            )
        return _("Every {occurrence} years on {month} {day}").format(
            occurrence=self.occurrence, day=day_of_month, month=month
        )

    if self.frequency == self.MONTH:
        if self.schedule_start and self.schedule_start.day == 31:
            day_of_month = _("last day")
        else:
            day_of_month = ordinal(next_report.end_date.day)
        if self.occurrence == 1:
            return _("Once a month on the {day}").format(day=day_of_month)
        return _("Every {occurrence} months on the {day}").format(
            occurrence=self.occurrence, day=day_of_month
        )

    weekday = next_report.end_date.strftime("%A")

    if self.occurrence == 1:
        return _("Once a week on {weekday}").format(weekday=weekday)
    return _("Every {occurrence} weeks on {weekday}").format(
        occurrence=self.occurrence, weekday=weekday
    )

is_up_to_date

is_up_to_date()
Source code in hypha/apply/projects/models/report.py
def is_up_to_date(self):
    return len(self.project.reports.to_do()) == 0

outstanding_reports

outstanding_reports()
Source code in hypha/apply/projects/models/report.py
def outstanding_reports(self):
    return len(self.project.reports.to_do())

has_very_late_reports

has_very_late_reports()
Source code in hypha/apply/projects/models/report.py
def has_very_late_reports(self):
    return self.project.reports.any_very_late()

past_due_reports

past_due_reports()
Source code in hypha/apply/projects/models/report.py
def past_due_reports(self):
    return self.project.reports.to_do()

last_report

last_report()
Source code in hypha/apply/projects/models/report.py
def last_report(self):
    today = timezone.now().date()
    # Get the most recent report that was either:
    # - due by today and not submitted
    # - was skipped but due after today
    # - was submitted but due after today
    return self.project.reports.filter(
        Q(end_date__lt=today) | Q(skipped=True) | Q(submitted__isnull=False)
    ).first()

current_due_report

current_due_report()
Source code in hypha/apply/projects/models/report.py
def current_due_report(self):
    if self.disable_reporting:
        return None

    # Project not started - no reporting required
    if not self.project.start_date:
        return None

    today = timezone.now().date()

    last_report = self.last_report()

    schedule_date = self.schedule_start or self.project.start_date

    if last_report:
        # Frequency is one time and last report exists - no reporting required anymore
        if self.does_not_repeat:
            return None

        if last_report.end_date < schedule_date:
            # reporting schedule changed schedule_start is now the next report date
            next_due_date = schedule_date
        else:
            # we've had a report since the schedule date so base next deadline from the report
            next_due_date = self.next_date(last_report.end_date)
    else:
        # first report required
        if self.schedule_start and self.schedule_start >= today:
            # Schedule changed since project inception
            next_due_date = self.schedule_start
        else:
            # schedule_start is the first day the project so the "last" period
            # ended one day before that. If date is in past we required report now
            if self.does_not_repeat:
                next_due_date = today
            else:
                next_due_date = max(
                    self.next_date(schedule_date - relativedelta(days=1)),
                    today,
                )

    report, _ = self.project.reports.update_or_create(
        project=self.project,
        current__isnull=True,
        skipped=False,
        end_date__gte=today,
        defaults={"end_date": next_due_date},
    )
    return report

current_report

current_report()

This is different from current_due_report as it will return a completed report if that one is the current one.

Source code in hypha/apply/projects/models/report.py
def current_report(self):
    """This is different from current_due_report as it will return a completed report
    if that one is the current one."""
    today = timezone.now().date()

    last_report = self.last_report()

    if last_report and last_report.end_date >= today:
        return last_report

    return self.current_due_report()

next_date

next_date(last_date)
Source code in hypha/apply/projects/models/report.py
def next_date(self, last_date):
    delta_frequency = self.frequency + "s"
    delta = relativedelta(**{delta_frequency: self.occurrence})
    next_date = last_date + delta
    return next_date

report_path

report_path(instance, filename)
Source code in hypha/apply/projects/models/report.py
def report_path(instance, filename):
    return (
        f"reports/{instance.report.report_id}/version/{instance.report_id}/{filename}"
    )