Skip to content

Commit

Permalink
Merge pull request #3693 from clumens/async-remote-cib-2.1
Browse files Browse the repository at this point in the history
Backport async remote CIB patches to 2.1
  • Loading branch information
kgaillot authored Oct 8, 2024
2 parents 9a63d3a + ec1fad4 commit 3a24ae5
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 39 deletions.
29 changes: 15 additions & 14 deletions daemons/based/based_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,8 @@ cib_remote_msg(gpointer data)
xmlNode *command = NULL;
pcmk__client_t *client = data;
int rc;
int timeout = 1000;
const char *client_name = pcmk__client_name(client);

if (pcmk_is_set(client->flags, pcmk__client_authenticated)) {
timeout = -1;
}

crm_trace("Remote %s message received for client %s",
pcmk__client_type_str(PCMK__CLIENT_TYPE(client)), client_name);

Expand Down Expand Up @@ -484,7 +479,20 @@ cib_remote_msg(gpointer data)
}
#endif

rc = pcmk__read_remote_message(client->remote, timeout);
rc = pcmk__read_available_remote_data(client->remote);
switch (rc) {
case pcmk_rc_ok:
break;

case EAGAIN:
/* We haven't read the whole message yet */
return 0;

default:
/* Error */
crm_trace("Error reading from remote client: %s", pcmk_rc_str(rc));
return -1;
}

/* must pass auth before we will process anything else */
if (!pcmk_is_set(client->flags, pcmk__client_authenticated)) {
Expand Down Expand Up @@ -521,17 +529,10 @@ cib_remote_msg(gpointer data)
}

command = pcmk__remote_message_xml(client->remote);
while (command) {
if (command != NULL) {
crm_trace("Remote message received from client %s", client_name);
cib_handle_remote_msg(client, command);
free_xml(command);
command = pcmk__remote_message_xml(client->remote);
}

if (rc == ENOTCONN) {
crm_trace("Remote CIB client %s disconnected while reading from it",
client_name);
return -1;
}

return 0;
Expand Down
1 change: 1 addition & 0 deletions include/crm/common/remote_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ typedef struct pcmk__remote_s pcmk__remote_t;

int pcmk__remote_send_xml(pcmk__remote_t *remote, const xmlNode *msg);
int pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms);
int pcmk__read_available_remote_data(pcmk__remote_t *remote);
int pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms);
xmlNode *pcmk__remote_message_xml(pcmk__remote_t *remote);
int pcmk__connect_remote(const char *host, int port, int timeout_ms,
Expand Down
92 changes: 70 additions & 22 deletions lib/cib/cib_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ typedef struct cib_remote_opaque_s {
pcmk__remote_t command;
pcmk__remote_t callback;
pcmk__output_t *out;
time_t start_time;
int timeout_sec;
} cib_remote_opaque_t;

static int
Expand Down Expand Up @@ -207,35 +209,60 @@ cib_remote_callback_dispatch(gpointer user_data)
cib_remote_opaque_t *private = cib->variant_opaque;

xmlNode *msg = NULL;
const char *type = NULL;

crm_info("Message on callback channel");

rc = pcmk__read_remote_message(&private->callback, -1);

msg = pcmk__remote_message_xml(&private->callback);
while (msg) {
const char *type = crm_element_value(msg, PCMK__XA_T);
/* If start time is 0, we've previously handled a complete message and this
* connection is being reused for a new message. Reset the start_time,
* giving this new message timeout_sec from now to complete.
*/
if (private->start_time == 0) {
private->start_time = time(NULL);
}

crm_trace("Activating %s callbacks...", type);
rc = pcmk__read_available_remote_data(&private->callback);
switch (rc) {
case pcmk_rc_ok:
/* We have the whole message so process it */
break;

if (pcmk__str_eq(type, PCMK__VALUE_CIB, pcmk__str_none)) {
cib_native_callback(cib, msg, 0, 0);
case EAGAIN:
/* Have we timed out? */
if (time(NULL) >= private->start_time + private->timeout_sec) {
crm_info("Error reading from CIB manager connection: %s",
pcmk_rc_str(ETIME));
return -1;
}

/* We haven't read the whole message yet */
return 0;

default:
/* Error */
crm_info("Error reading from CIB manager connection: %s",
pcmk_rc_str(rc));
return -1;
}

} else if (pcmk__str_eq(type, PCMK__VALUE_CIB_NOTIFY, pcmk__str_none)) {
g_list_foreach(cib->notify_list, cib_native_notify, msg);
msg = pcmk__remote_message_xml(&private->callback);
if (msg == NULL) {
private->start_time = 0;
return 0;
}

} else {
crm_err("Unknown message type: %s", type);
}
type = crm_element_value(msg, PCMK__XA_T);

free_xml(msg);
msg = pcmk__remote_message_xml(&private->callback);
}
crm_trace("Activating %s callbacks...", type);

if (rc == ENOTCONN) {
return -1;
if (pcmk__str_eq(type, PCMK__VALUE_CIB, pcmk__str_none)) {
cib_native_callback(cib, msg, 0, 0);
} else if (pcmk__str_eq(type, PCMK__VALUE_CIB_NOTIFY, pcmk__str_none)) {
g_list_foreach(cib->notify_list, cib_native_notify, msg);
} else {
crm_err("Unknown message type: %s", type);
}

free_xml(msg);
private->start_time = 0;
return 0;
}

Expand All @@ -246,15 +273,35 @@ cib_remote_command_dispatch(gpointer user_data)
cib_t *cib = user_data;
cib_remote_opaque_t *private = cib->variant_opaque;

rc = pcmk__read_remote_message(&private->command, -1);
/* See cib_remote_callback_dispatch */
if (private->start_time == 0) {
private->start_time = time(NULL);
}

rc = pcmk__read_available_remote_data(&private->command);
if (rc == EAGAIN) {
/* Have we timed out? */
if (time(NULL) >= private->start_time + private->timeout_sec) {
crm_info("Error reading from CIB manager connection: %s",
pcmk_rc_str(ETIME));
return -1;
}

/* We haven't read the whole message yet */
return 0;
}

free(private->command.buffer);
private->command.buffer = NULL;
crm_err("received late reply for remote cib connection, discarding");

if (rc == ENOTCONN) {
if (rc != pcmk_rc_ok) {
crm_info("Error reading from CIB manager connection: %s",
pcmk_rc_str(rc));
return -1;
}

private->start_time = 0;
return 0;
}

Expand Down Expand Up @@ -426,6 +473,7 @@ cib_tls_signon(cib_t *cib, pcmk__remote_t *connection, gboolean event_channel)
}

crm_trace("remote client connection established");
private->timeout_sec = 60;
connection->source = mainloop_add_fd("cib-remote", G_PRIORITY_HIGH,
connection->tcp_socket, cib,
&cib_fd_callbacks);
Expand Down
7 changes: 4 additions & 3 deletions lib/common/remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ pcmk__remote_message_xml(pcmk__remote_t *remote)
crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
}

crm_log_xml_trace(xml, "[remote msg]");
return xml;
}

Expand Down Expand Up @@ -713,8 +714,8 @@ pcmk__remote_ready(const pcmk__remote_t *remote, int timeout_ms)
* \note This function will return when the socket read buffer is empty or an
* error is encountered.
*/
static int
read_available_remote_data(pcmk__remote_t *remote)
int
pcmk__read_available_remote_data(pcmk__remote_t *remote)
{
int rc = pcmk_rc_ok;
size_t read_len = sizeof(struct remote_header_v0);
Expand Down Expand Up @@ -849,7 +850,7 @@ pcmk__read_remote_message(pcmk__remote_t *remote, int timeout_ms)
CRM_XS " rc=%d", pcmk_rc_str(rc), rc);

} else {
rc = read_available_remote_data(remote);
rc = pcmk__read_available_remote_data(remote);
if (rc == pcmk_rc_ok) {
return rc;
} else if (rc == EAGAIN) {
Expand Down

0 comments on commit 3a24ae5

Please sign in to comment.