Skip to content

Commit

Permalink
AIP-84 Migrate XCom get entries endpoint to Fastapi (#44366)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljs-c authored Nov 26, 2024
1 parent 0470bd9 commit a18bcd7
Show file tree
Hide file tree
Showing 13 changed files with 1,256 additions and 393 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.XCOM)
@format_parameters({"limit": check_limit})
@provide_session
Expand Down
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ class XComResponseString(XComResponse):
@field_validator("value", mode="before")
def value_to_string(cls, v):
return str(v) if v is not None else None


class XComCollection(BaseModel):
"""List of XCom items."""

xcom_entries: list[XComResponse]
total_entries: int
336 changes: 242 additions & 94 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3651,6 +3651,200 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
get:
tags:
- XCom
summary: Get Xcom Entry
description: Get an XCom entry.
operationId: get_xcom_entry
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: xcom_key
in: path
required: true
schema:
type: string
title: Xcom Key
- name: map_index
in: query
required: false
schema:
type: integer
minimum: -1
default: -1
title: Map Index
- name: deserialize
in: query
required: false
schema:
type: boolean
default: false
title: Deserialize
- name: stringify
in: query
required: false
schema:
type: boolean
default: true
title: Stringify
responses:
'200':
description: Successful Response
content:
application/json:
schema:
anyOf:
- $ref: '#/components/schemas/XComResponseNative'
- $ref: '#/components/schemas/XComResponseString'
title: Response Get Xcom Entry
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries:
get:
tags:
- XCom
summary: Get Xcom Entries
description: 'Get all XCom entries.
This endpoint allows specifying `~` as the dag_id, dag_run_id, task_id to
retrieve XCom entries for all DAGs.'
operationId: get_xcom_entries
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: xcom_key
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Xcom Key
- name: map_index
in: query
required: false
schema:
anyOf:
- type: integer
minimum: -1
- type: 'null'
title: Map Index
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/XComCollection'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}:
get:
tags:
Expand Down Expand Up @@ -5199,100 +5393,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
get:
tags:
- XCom
summary: Get Xcom Entry
description: Get an XCom entry.
operationId: get_xcom_entry
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: xcom_key
in: path
required: true
schema:
type: string
title: Xcom Key
- name: map_index
in: query
required: false
schema:
type: integer
minimum: -1
default: -1
title: Map Index
- name: deserialize
in: query
required: false
schema:
type: boolean
default: false
title: Deserialize
- name: stringify
in: query
required: false
schema:
type: boolean
default: true
title: Stringify
responses:
'200':
description: Successful Response
content:
application/json:
schema:
anyOf:
- $ref: '#/components/schemas/XComResponseNative'
- $ref: '#/components/schemas/XComResponseString'
title: Response Get Xcom Entry
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}:
get:
tags:
Expand Down Expand Up @@ -8715,6 +8815,54 @@ components:
- git_version
title: VersionInfo
description: Version information serializer for responses.
XComCollection:
properties:
xcom_entries:
items:
$ref: '#/components/schemas/XComResponse'
type: array
title: Xcom Entries
total_entries:
type: integer
title: Total Entries
type: object
required:
- xcom_entries
- total_entries
title: XComCollection
description: List of XCom items.
XComResponse:
properties:
key:
type: string
title: Key
timestamp:
type: string
format: date-time
title: Timestamp
logical_date:
type: string
format: date-time
title: Logical Date
map_index:
type: integer
title: Map Index
task_id:
type: string
title: Task Id
dag_id:
type: string
title: Dag Id
type: object
required:
- key
- timestamp
- logical_date
- map_index
- task_id
- dag_id
title: XComResponse
description: Serializer for a xcom item.
XComResponseNative:
properties:
key:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@
authenticated_router.include_router(plugins_router)
authenticated_router.include_router(pools_router)
authenticated_router.include_router(providers_router)
authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_router)
authenticated_router.include_router(tasks_router)
authenticated_router.include_router(variables_router)
authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_log_router)


Expand Down
Loading

0 comments on commit a18bcd7

Please sign in to comment.