You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
6.0 KiB

#include "publisher.h"
void client_print(struct Client *c)
{
printf("State: ");
switch(c->buffer->state) {
case FREE:
printf("FREE");
break;
case RESERVED:
printf("RESERVED");
break;
case WAIT:
printf("WAIT");
break;
case WRITABLE:
printf("WRITABLE");
break;
case BUSY:
printf("BUSY");
break;
case BUFFER_FULL:
printf("BUFFER_FULL");
break;
default:
printf("LOL");
break;
}
printf("\n");
}
void client_disconnect(struct Client *c)
{
av_write_trailer(c->ofmt_ctx);
avio_close(c->ofmt_ctx->pb);
avformat_free_context(c->ofmt_ctx);
buffer_free(c->buffer);
buffer_init(c->buffer);
client_set_state(c, FREE);
c->current_segment_id = -1;
return;
}
void client_set_state(struct Client *c, enum State state)
{
pthread_mutex_lock(&c->state_lock);
c->state = state;
pthread_mutex_unlock(&c->state_lock);
}
void publisher_init(struct PublisherContext **pub)
{
int i;
*pub = (struct PublisherContext*) malloc(sizeof(struct PublisherContext));
(*pub)->nb_threads = 4;
(*pub)->current_segment_id = -1;
(*pub)->buffer = (struct Buffer*) malloc(sizeof(struct Buffer));
(*pub)->fs_buffer = (struct Buffer*) malloc(sizeof(struct Buffer));
(*pub)->shutdown = 0;
buffer_init((*pub)->buffer);
buffer_init((*pub)->fs_buffer);
for(i = 0; i < MAX_CLIENTS; i++) {
struct Client *c = &(*pub)->subscribers[i];
c->buffer = (struct Buffer*) malloc(sizeof(struct Buffer));
buffer_init(c->buffer);
c->id = i;
c->current_segment_id = -1;
pthread_mutex_init(&c->state_lock, NULL);
client_set_state(c, FREE);
}
return;
}
int publisher_reserve_client(struct PublisherContext *pub)
{
int i;
for(i = 0; i < MAX_CLIENTS; i++) {
switch(pub->subscribers[i].buffer->state) {
case FREE:
buffer_set_state(pub->subscribers[i].buffer, RESERVED);
return 0;
default:
continue;
}
}
return 1;
}
void publisher_cancel_reserve(struct PublisherContext *pub)
{
int i;
for(i = 0; i < MAX_CLIENTS; i++) {
switch(pub->subscribers[i].buffer->state) {
case RESERVED:
buffer_set_state(pub->subscribers[i].buffer, FREE);
return;
default:
continue;
}
}
return;
}
void publisher_add_client(struct PublisherContext *pub, AVFormatContext *ofmt_ctx)
{
int i, j;
struct Segment *prebuffer_seg;
for(i = 0; i < MAX_CLIENTS; i++) {
switch(pub->subscribers[i].buffer->state) {
case RESERVED:
printf("Put new client at %d, ofmt_ctx: %p pb: %p\n", i, ofmt_ctx, ofmt_ctx->pb);
pub->subscribers[i].ofmt_ctx = ofmt_ctx;
buffer_set_state(pub->subscribers[i].buffer, WRITABLE);
client_set_state(&pub->subscribers[i], WRITABLE);
for (j = 0; j < BUFFER_SEGMENTS; j++) {
if ((prebuffer_seg = buffer_get_segment_at(pub->fs_buffer, pub->fs_buffer->read + j))) {
buffer_push_segment(pub->subscribers[i].buffer, prebuffer_seg);
printf("pushed prebuffer segment.\n");
}
}
return;
default:
continue;
}
}
}
void publisher_free(struct PublisherContext *pub)
{
int i;
buffer_free(pub->buffer);
buffer_free(pub->fs_buffer);
for(i = 0; i < MAX_CLIENTS; i++) {
struct Client *c = &pub->subscribers[i];
buffer_free(c->buffer);
}
return;
}
void publish(struct PublisherContext *pub)
{
int i;
struct Segment *seg = buffer_peek_segment(pub->buffer);
if (seg) {
pub->current_segment_id = seg->id;
for (i = 0; i < MAX_CLIENTS; i++) {
switch(pub->subscribers[i].buffer->state) {
case BUFFER_FULL:
fprintf(stderr, "Warning: dropping segment for client %d\n", i);
continue;
case WAIT:
case WRITABLE:
buffer_push_segment(pub->subscribers[i].buffer, seg);
continue;
default:
continue;
}
}
buffer_push_segment(pub->fs_buffer, seg);
}
buffer_drop_segment(pub->buffer);
if (pub->fs_buffer->nb_segs == BUFFER_SEGMENTS) {
buffer_drop_segment(pub->fs_buffer);
printf("Dropped segment from prebuffer buffer.\n");
}
}
char *publisher_gen_status_json(struct PublisherContext *pub)
{
int nb_free = 0, nb_reserved = 0, nb_wait = 0, nb_writable = 0, nb_busy = 0, nb_buffer_full = 0, current_read = 0, newest_write = 0, oldest_write = 0;
int i;
struct Client *c;
char *status = (char*) malloc(sizeof(char)* 4096);
current_read = pub->current_segment_id;
oldest_write = current_read;
for (i = 0; i < MAX_CLIENTS; i++) {
c = &pub->subscribers[i];
if (c->current_segment_id > 0 && c->current_segment_id < oldest_write) {
oldest_write = c->current_segment_id;
}
if (c->current_segment_id > newest_write) {
newest_write = c->current_segment_id;
}
switch(c->buffer->state) {
case FREE:
nb_free++;
continue;
case RESERVED:
nb_reserved++;
continue;
case WAIT:
nb_wait++;
continue;
case WRITABLE:
nb_writable++;
continue;
case BUSY:
nb_busy++;
continue;
case BUFFER_FULL:
nb_buffer_full++;
continue;
default:
continue;
}
}
snprintf(status, 4095, "{\n\t\"free\": %d,\n\t\"reserved\": %d,\n\t\"wait\": %d,\n\t\"writable\": %d,\n\t\"busy\": %d\n\t\"buffer_full\": %d\n\t\"current_read\": %d\n\t\"newest_write\": %d\n\t\"oldest_write\": %d\n}\n", nb_free, nb_reserved, nb_wait, nb_writable, nb_busy, nb_buffer_full, current_read, newest_write, oldest_write);
return status;
}